This is an automated email from the ASF dual-hosted git repository.

utkarsharma pushed a commit to branch v2-9-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1f2bb57038a5052c7577316c852b54b9158ffd75
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Sat May 4 08:06:00 2024 +0100

    Fix alembic autogeneration and rename mismatching constraints (#39032)
    
    * Fix alembic autogeneration and rename mismatching constraints
    
    The alembic autogeneration is not working as expected and the tests
    were detecting it because we use DBs created from the ORM to run tests.
    When a change is made in the ORM and the ORM is used to initialize the
    database for tests, the changes in the ORM will appear the same with what
    is in the migration file. To be sure that both match, we have to compare
    the database generated using the migration file to the database that could 
be
    created from the ORM.
    To fix this, I added 'use_migration_file' arg to resetdb function and 
updated
    the db reset in conftest to use migration file during test db reset.
    
    As part of this fix, I also updated mismatching constraint names. The 
update was
    done in the migration file instead of the ORM as I take the ORM as the 
source
    of truth. New airflow users create their DB from the ORM with the correct 
naming
    because we have a naming convention. Old airflow users would have to upgrade
    to use these names from ORM instead of the reverse.
    
    I also removed the `sqlite_sequence` table which is specific to sqlite and 
not
    needed for anything. An alternative would be to add `sqlite_autoincrement` 
to
    table args in the ORM and migration but this table is not that useful.
    
    * fixup! Fix alembic autogeneration and rename mismatching constraints
    
    * fixup! fixup! Fix alembic autogeneration and rename mismatching 
constraints
    
    * fixup! fixup! fixup! Fix alembic autogeneration and rename mismatching 
constraints
    
    * Fix mysql, sqlite and issue with cascading deletes
    
    * fixup! Fix mysql, sqlite and issue with cascading deletes
    
    * Fix migration for mysql
    
    * Fix clear_number ORM server_default
    
    * Fix type change for mysql
    
    * Fix constraints for sqlite and move migration to 2.9.2
    
    * Fix sqlite constraints update and ignore session_session_id_uq index
    
    * Fix processor_subdir in the migration file for mysql and make 
use-migration-files an option in db commands
    
    * fixup! Fix processor_subdir in the migration file for mysql and make 
use-migration-files an option in db commands
    
    * Apply suggestions from code review
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
    
    ---------
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
    (cherry picked from commit 00f096918df2c9cb13589924cb5c31633be5e6f8)
---
 airflow/cli/cli_config.py                          |  10 +-
 airflow/cli/commands/db_command.py                 |   3 +-
 airflow/migrations/env.py                          |   3 +
 ...nconsistency_between_ORM_and_migration_files.py | 297 +++++++++++++++++++++
 airflow/models/dagrun.py                           |   2 +-
 airflow/utils/db.py                                |  15 +-
 docs/apache-airflow/img/airflow_erd.sha256         |   2 +-
 docs/apache-airflow/img/airflow_erd.svg            |   4 +-
 docs/apache-airflow/migrations-ref.rst             |   4 +-
 tests/cli/commands/test_db_command.py              |  66 ++++-
 tests/conftest.py                                  |   2 +-
 ... => example_external_task_child_deferrable.py } |   0
 tests/utils/test_db.py                             |   3 +-
 13 files changed, 383 insertions(+), 28 deletions(-)

diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py
index cd2a25b074..82b00f0ff2 100644
--- a/airflow/cli/cli_config.py
+++ b/airflow/cli/cli_config.py
@@ -691,6 +691,12 @@ ARG_DB_SKIP_INIT = Arg(
     action="store_true",
     default=False,
 )
+ARG_DB_USE_MIGRATION_FILES = Arg(
+    ("-m", "--use-migration-files"),
+    help="Use migration files to perform migration",
+    action="store_true",
+    default=False,
+)
 
 # webserver
 ARG_PORT = Arg(
@@ -1525,7 +1531,7 @@ DB_COMMANDS = (
         name="reset",
         help="Burn down and rebuild the metadata database",
         func=lazy_load_command("airflow.cli.commands.db_command.resetdb"),
-        args=(ARG_YES, ARG_DB_SKIP_INIT, ARG_VERBOSE),
+        args=(ARG_YES, ARG_DB_SKIP_INIT, ARG_DB_USE_MIGRATION_FILES, 
ARG_VERBOSE),
     ),
     ActionCommand(
         name="upgrade",
@@ -1545,6 +1551,7 @@ DB_COMMANDS = (
             ARG_DB_FROM_REVISION,
             ARG_DB_FROM_VERSION,
             ARG_DB_RESERIALIZE_DAGS,
+            ARG_DB_USE_MIGRATION_FILES,
             ARG_VERBOSE,
         ),
         hide=True,
@@ -1568,6 +1575,7 @@ DB_COMMANDS = (
             ARG_DB_FROM_REVISION,
             ARG_DB_FROM_VERSION,
             ARG_DB_RESERIALIZE_DAGS,
+            ARG_DB_USE_MIGRATION_FILES,
             ARG_VERBOSE,
         ),
     ),
diff --git a/airflow/cli/commands/db_command.py 
b/airflow/cli/commands/db_command.py
index 184496df42..8c2601a7af 100644
--- a/airflow/cli/commands/db_command.py
+++ b/airflow/cli/commands/db_command.py
@@ -61,7 +61,7 @@ def resetdb(args):
     print(f"DB: {settings.engine.url!r}")
     if not (args.yes or input("This will drop existing tables if they exist. 
Proceed? (y/n)").upper() == "Y"):
         raise SystemExit("Cancelled")
-    db.resetdb(skip_init=args.skip_init)
+    db.resetdb(skip_init=args.skip_init, 
use_migration_files=args.use_migration_files)
 
 
 def upgradedb(args):
@@ -132,6 +132,7 @@ def migratedb(args):
         from_revision=from_revision,
         show_sql_only=args.show_sql_only,
         reserialize_dags=args.reserialize_dags,
+        use_migration_files=args.use_migration_files,
     )
     if not args.show_sql_only:
         print("Database migrating done!")
diff --git a/airflow/migrations/env.py b/airflow/migrations/env.py
index c521654c1c..7a5854bb06 100644
--- a/airflow/migrations/env.py
+++ b/airflow/migrations/env.py
@@ -29,6 +29,9 @@ from airflow.utils.db import compare_server_default, 
compare_type
 
 def include_object(_, name, type_, *args):
     """Filter objects for autogenerating revisions."""
+    # Ignore the sqlite_sequence table, which is an internal SQLite construct
+    if name == "sqlite_sequence":
+        return False
     # Ignore _anything_ to do with Celery, or FlaskSession's tables
     if type_ == "table" and (name.startswith("celery_") or name == "session"):
         return False
diff --git 
a/airflow/migrations/versions/0142_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py
 
b/airflow/migrations/versions/0142_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py
new file mode 100644
index 0000000000..3d07b67e3a
--- /dev/null
+++ 
b/airflow/migrations/versions/0142_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py
@@ -0,0 +1,297 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Fix inconsistency between ORM and migration files.
+
+Revision ID: 686269002441
+Revises: bff083ad727d
+Create Date: 2024-04-15 14:19:49.913797
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy import literal
+
+# revision identifiers, used by Alembic.
+revision = "686269002441"
+down_revision = "bff083ad727d"
+branch_labels = None
+depends_on = None
+airflow_version = "2.9.2"
+
+
+def upgrade():
+    """Apply Update missing constraints."""
+    conn = op.get_bind()
+    if conn.dialect.name == "mysql":
+        # TODO: Rewrite these queries to use alembic when lowest MYSQL version 
supports IF EXISTS
+        conn.execute(
+            sa.text("""
+        set @var=if((SELECT true FROM information_schema.TABLE_CONSTRAINTS 
WHERE
+            CONSTRAINT_SCHEMA = DATABASE() AND
+            TABLE_NAME        = 'connection' AND
+            CONSTRAINT_NAME   = 'unique_conn_id' AND
+            CONSTRAINT_TYPE   = 'UNIQUE') = true,'ALTER TABLE connection
+            drop constraint unique_conn_id','select 1');
+
+        prepare stmt from @var;
+        execute stmt;
+        deallocate prepare stmt;
+        """)
+        )
+        # Dropping the below and recreating cause there's no IF NOT EXISTS in 
mysql
+        conn.execute(
+            sa.text("""
+                set @var=if((SELECT true FROM 
information_schema.TABLE_CONSTRAINTS WHERE
+                    CONSTRAINT_SCHEMA = DATABASE() AND
+                    TABLE_NAME        = 'connection' AND
+                    CONSTRAINT_NAME   = 'connection_conn_id_uq' AND
+                    CONSTRAINT_TYPE   = 'UNIQUE') = true,'ALTER TABLE 
connection
+                    drop constraint connection_conn_id_uq','select 1');
+
+                prepare stmt from @var;
+                execute stmt;
+                deallocate prepare stmt;
+                """)
+        )
+    elif conn.dialect.name == "sqlite":
+        # SQLite does not support DROP CONSTRAINT
+        # We have to recreate the table without the constraint
+        conn.execute(sa.text("PRAGMA foreign_keys=off"))
+        conn.execute(
+            sa.text("""
+        CREATE TABLE connection_new (
+                id INTEGER NOT NULL,
+                conn_id VARCHAR(250) NOT NULL,
+                conn_type VARCHAR(500) NOT NULL,
+                host VARCHAR(500),
+                schema VARCHAR(500),
+                login TEXT,
+                password TEXT,
+                port INTEGER,
+                extra TEXT,
+                is_encrypted BOOLEAN,
+                is_extra_encrypted BOOLEAN,
+                description VARCHAR(5000),
+                CONSTRAINT connection_pkey PRIMARY KEY (id),
+                CONSTRAINT connection_conn_id_uq UNIQUE (conn_id)
+        )
+        """)
+        )
+        conn.execute(sa.text("INSERT INTO connection_new SELECT * FROM 
connection"))
+        conn.execute(sa.text("DROP TABLE connection"))
+        conn.execute(sa.text("ALTER TABLE connection_new RENAME TO 
connection"))
+        conn.execute(sa.text("PRAGMA foreign_keys=on"))
+    else:
+        op.execute("ALTER TABLE connection DROP CONSTRAINT IF EXISTS 
unique_conn_id")
+        # Dropping and recreating cause there's no IF NOT EXISTS
+        op.execute("ALTER TABLE connection DROP CONSTRAINT IF EXISTS 
connection_conn_id_uq")
+
+    with op.batch_alter_table("connection") as batch_op:
+        batch_op.create_unique_constraint(batch_op.f("connection_conn_id_uq"), 
["conn_id"])
+
+    max_cons = sa.table("dag", sa.column("max_consecutive_failed_dag_runs"))
+    
op.execute(max_cons.update().values(max_consecutive_failed_dag_runs=literal("0")))
+    with op.batch_alter_table("dag") as batch_op:
+        batch_op.alter_column("max_consecutive_failed_dag_runs", 
existing_type=sa.Integer(), nullable=False)
+
+    with op.batch_alter_table("task_instance") as batch_op:
+        batch_op.drop_constraint("task_instance_dag_run_fkey", 
type_="foreignkey")
+
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.drop_constraint("task_reschedule_dr_fkey", type_="foreignkey")
+
+    if conn.dialect.name == "mysql":
+        conn.execute(
+            sa.text("""
+                        set @var=if((SELECT true FROM 
information_schema.TABLE_CONSTRAINTS WHERE
+                            CONSTRAINT_SCHEMA = DATABASE() AND
+                            TABLE_NAME        = 'dag_run' AND
+                            CONSTRAINT_NAME   = 
'dag_run_dag_id_execution_date_uq' AND
+                            CONSTRAINT_TYPE   = 'UNIQUE') = true,'ALTER TABLE 
dag_run
+                            drop constraint 
dag_run_dag_id_execution_date_uq','select 1');
+
+                        prepare stmt from @var;
+                        execute stmt;
+                        deallocate prepare stmt;
+                        """)
+        )
+        conn.execute(
+            sa.text("""
+                        set @var=if((SELECT true FROM 
information_schema.TABLE_CONSTRAINTS WHERE
+                            CONSTRAINT_SCHEMA = DATABASE() AND
+                            TABLE_NAME        = 'dag_run' AND
+                            CONSTRAINT_NAME   = 'dag_run_dag_id_run_id_uq' AND
+                            CONSTRAINT_TYPE   = 'UNIQUE') = true,'ALTER TABLE 
dag_run
+                            drop constraint dag_run_dag_id_run_id_uq','select 
1');
+
+                        prepare stmt from @var;
+                        execute stmt;
+                        deallocate prepare stmt;
+                        """)
+        )
+        # below we drop and recreate the constraints because there's no IF NOT 
EXISTS
+        conn.execute(
+            sa.text("""
+                                set @var=if((SELECT true FROM 
information_schema.TABLE_CONSTRAINTS WHERE
+                                    CONSTRAINT_SCHEMA = DATABASE() AND
+                                    TABLE_NAME        = 'dag_run' AND
+                                    CONSTRAINT_NAME   = 
'dag_run_dag_id_execution_date_key' AND
+                                    CONSTRAINT_TYPE   = 'UNIQUE') = 
true,'ALTER TABLE dag_run
+                                    drop constraint 
dag_run_dag_id_execution_date_key','select 1');
+
+                                prepare stmt from @var;
+                                execute stmt;
+                                deallocate prepare stmt;
+                                """)
+        )
+        conn.execute(
+            sa.text("""
+                            set @var=if((SELECT true FROM 
information_schema.TABLE_CONSTRAINTS WHERE
+                                CONSTRAINT_SCHEMA = DATABASE() AND
+                                TABLE_NAME        = 'dag_run' AND
+                                CONSTRAINT_NAME   = 
'dag_run_dag_id_run_id_key' AND
+                                CONSTRAINT_TYPE   = 'UNIQUE') = true,'ALTER 
TABLE dag_run
+                                drop constraint 
dag_run_dag_id_run_id_key','select 1');
+
+                            prepare stmt from @var;
+                            execute stmt;
+                            deallocate prepare stmt;
+                            """)
+        )
+        with op.batch_alter_table("callback_request", schema=None) as batch_op:
+            batch_op.alter_column(
+                "processor_subdir",
+                existing_type=sa.Text(length=2000),
+                type_=sa.String(length=2000),
+                existing_nullable=True,
+            )
+
+        with op.batch_alter_table("dag", schema=None) as batch_op:
+            batch_op.alter_column(
+                "processor_subdir",
+                existing_type=sa.Text(length=2000),
+                type_=sa.String(length=2000),
+                existing_nullable=True,
+            )
+
+        with op.batch_alter_table("import_error", schema=None) as batch_op:
+            batch_op.alter_column(
+                "processor_subdir",
+                existing_type=sa.Text(length=2000),
+                type_=sa.String(length=2000),
+                existing_nullable=True,
+            )
+
+        with op.batch_alter_table("serialized_dag", schema=None) as batch_op:
+            batch_op.alter_column(
+                "processor_subdir",
+                existing_type=sa.Text(length=2000),
+                type_=sa.String(length=2000),
+                existing_nullable=True,
+            )
+
+    elif conn.dialect.name == "sqlite":
+        # SQLite does not support DROP CONSTRAINT
+        # We have to recreate the table without the constraint
+        conn.execute(sa.text("PRAGMA foreign_keys=off"))
+        conn.execute(
+            sa.text("""
+            CREATE TABLE dag_run_new (
+                id INTEGER NOT NULL,
+                dag_id VARCHAR(250) NOT NULL,
+                queued_at TIMESTAMP,
+                execution_date TIMESTAMP NOT NULL,
+                start_date TIMESTAMP,
+                end_date TIMESTAMP,
+                state VARCHAR(50),
+                run_id VARCHAR(250) NOT NULL,
+                creating_job_id INTEGER,
+                external_trigger BOOLEAN,
+                run_type VARCHAR(50) NOT NULL,
+                conf BLOB,
+                data_interval_start TIMESTAMP,
+                data_interval_end TIMESTAMP,
+                last_scheduling_decision TIMESTAMP,
+                dag_hash VARCHAR(32),
+                log_template_id INTEGER,
+                updated_at TIMESTAMP,
+                clear_number INTEGER DEFAULT '0' NOT NULL,
+                CONSTRAINT dag_run_pkey PRIMARY KEY (id),
+                CONSTRAINT dag_run_dag_id_execution_date_key UNIQUE (dag_id, 
execution_date),
+                CONSTRAINT dag_run_dag_id_run_id_key UNIQUE (dag_id, run_id),
+                CONSTRAINT task_instance_log_template_id_fkey FOREIGN 
KEY(log_template_id) REFERENCES log_template (id) ON DELETE NO ACTION
+            )
+        """)
+        )
+
+        conn.execute(sa.text("INSERT INTO dag_run_new SELECT * FROM dag_run"))
+        conn.execute(sa.text("DROP TABLE dag_run"))
+        conn.execute(sa.text("ALTER TABLE dag_run_new RENAME TO dag_run"))
+        conn.execute(sa.text("PRAGMA foreign_keys=on"))
+        with op.batch_alter_table("dag_run") as batch_op:
+            batch_op.create_index("dag_id_state", ["dag_id", "state"], 
if_not_exists=True)
+            batch_op.create_index("idx_dag_run_dag_id", ["dag_id"], 
if_not_exists=True)
+            batch_op.create_index(
+                "idx_dag_run_running_dags",
+                ["state", "dag_id"],
+                sqlite_where=sa.text("state='running'"),
+                if_not_exists=True,
+            )
+            batch_op.create_index(
+                "idx_dag_run_queued_dags",
+                ["state", "dag_id"],
+                sqlite_where=sa.text("state='queued'"),
+                if_not_exists=True,
+            )
+
+    else:
+        op.execute("ALTER TABLE dag_run DROP CONSTRAINT IF EXISTS 
dag_run_dag_id_execution_date_uq")
+        op.execute("ALTER TABLE dag_run DROP CONSTRAINT IF EXISTS 
dag_run_dag_id_run_id_uq")
+        # below we drop and recreate the constraints because there's no IF NOT 
EXISTS
+        op.execute("ALTER TABLE dag_run DROP CONSTRAINT IF EXISTS 
dag_run_dag_id_execution_date_key")
+        op.execute("ALTER TABLE dag_run DROP CONSTRAINT IF EXISTS 
dag_run_dag_id_run_id_key")
+
+    with op.batch_alter_table("dag_run") as batch_op:
+        batch_op.create_unique_constraint("dag_run_dag_id_execution_date_key", 
["dag_id", "execution_date"])
+        batch_op.create_unique_constraint("dag_run_dag_id_run_id_key", 
["dag_id", "run_id"])
+
+    with op.batch_alter_table("task_instance") as batch_op:
+        batch_op.create_foreign_key(
+            "task_instance_dag_run_fkey",
+            "dag_run",
+            ["dag_id", "run_id"],
+            ["dag_id", "run_id"],
+            ondelete="CASCADE",
+        )
+
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.create_foreign_key(
+            "task_reschedule_dr_fkey",
+            "dag_run",
+            ["dag_id", "run_id"],
+            ["dag_id", "run_id"],
+            ondelete="CASCADE",
+        )
+
+
+def downgrade():
+    """NO downgrade because this is to make ORM consistent with the 
database."""
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 522cb3e055..c6fc69195f 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -147,7 +147,7 @@ class DagRun(Base, LoggingMixin):
     # Keeps track of the number of times the dagrun had been cleared.
     # This number is incremented only when the DagRun is re-Queued,
     # when the DagRun is cleared.
-    clear_number = Column(Integer, default=0, nullable=False)
+    clear_number = Column(Integer, default=0, nullable=False, 
server_default="0")
 
     # Remove this `if` after upgrading Sphinx-AutoAPI
     if not TYPE_CHECKING and "BUILDING_AIRFLOW_DOCS" in os.environ:
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 2e30d84ffd..8bd7d976ad 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -91,7 +91,7 @@ _REVISION_HEADS_MAP = {
     "2.8.0": "10b52ebd31f7",
     "2.8.1": "88344c1d9134",
     "2.9.0": "1949afb29106",
-    "2.9.2": "bff083ad727d",
+    "2.9.2": "686269002441",
 }
 
 
@@ -743,13 +743,13 @@ def _create_db_from_orm(session):
 
 
 @provide_session
-def initdb(session: Session = NEW_SESSION, load_connections: bool = True):
+def initdb(session: Session = NEW_SESSION, load_connections: bool = True, 
use_migration_files: bool = False):
     """Initialize Airflow database."""
     import_all_models()
 
     db_exists = _get_current_revision(session)
-    if db_exists:
-        upgradedb(session=session)
+    if db_exists or use_migration_files:
+        upgradedb(session=session, use_migration_files=use_migration_files)
     else:
         _create_db_from_orm(session=session)
     if conf.getboolean("database", "LOAD_DEFAULT_CONNECTIONS") and 
load_connections:
@@ -1557,6 +1557,7 @@ def upgradedb(
     show_sql_only: bool = False,
     reserialize_dags: bool = True,
     session: Session = NEW_SESSION,
+    use_migration_files: bool = False,
 ):
     """
     Upgrades the DB.
@@ -1613,7 +1614,7 @@ def upgradedb(
     if errors_seen:
         exit(1)
 
-    if not to_revision and not _get_current_revision(session=session):
+    if not to_revision and not _get_current_revision(session=session) and not 
use_migration_files:
         # Don't load default connections
         # New DB; initialize and exit
         initdb(session=session, load_connections=False)
@@ -1643,7 +1644,7 @@ def upgradedb(
 
 
 @provide_session
-def resetdb(session: Session = NEW_SESSION, skip_init: bool = False):
+def resetdb(session: Session = NEW_SESSION, skip_init: bool = False, 
use_migration_files: bool = False):
     """Clear out the database."""
     if not settings.engine:
         raise RuntimeError("The settings.engine must be set. This is a 
critical assertion")
@@ -1658,7 +1659,7 @@ def resetdb(session: Session = NEW_SESSION, skip_init: 
bool = False):
         drop_airflow_moved_tables(connection)
 
     if not skip_init:
-        initdb(session=session)
+        initdb(session=session, use_migration_files=use_migration_files)
 
 
 @provide_session
diff --git a/docs/apache-airflow/img/airflow_erd.sha256 
b/docs/apache-airflow/img/airflow_erd.sha256
index 84f3b91bf9..841a4f7ecc 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-a82d08fd5d725c40dab022228ac57a514851c1718c21fe8712ff5f12698b5d1f
\ No newline at end of file
+d6dee7810741e867800584816b914e94de069f100dfe5895ecb12b0cf86b4db4
\ No newline at end of file
diff --git a/docs/apache-airflow/img/airflow_erd.svg 
b/docs/apache-airflow/img/airflow_erd.svg
index 2dec3838a9..2b338b6d51 100644
--- a/docs/apache-airflow/img/airflow_erd.svg
+++ b/docs/apache-airflow/img/airflow_erd.svg
@@ -1361,7 +1361,7 @@
 <g id="edge41" class="edge">
 <title>task_instance&#45;&#45;xcom</title>
 <path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1198.1,-805.22C1228.72,-801.45 1260.55,-798.25 1290.36,-795.96"/>
-<text text-anchor="start" x="1259.36" y="-784.76" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1280.36" y="-784.76" font-family="Times,serif" 
font-size="14.00">1</text>
 <text text-anchor="start" x="1198.1" y="-794.02" font-family="Times,serif" 
font-size="14.00">1</text>
 </g>
 <!-- task_instance&#45;&#45;xcom -->
@@ -1375,7 +1375,7 @@
 <g id="edge43" class="edge">
 <title>task_instance&#45;&#45;xcom</title>
 <path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1198.1,-831.83C1228.72,-828.75 1260.55,-825.32 1290.36,-821.86"/>
-<text text-anchor="start" x="1280.36" y="-825.66" font-family="Times,serif" 
font-size="14.00">1</text>
+<text text-anchor="start" x="1259.36" y="-825.66" font-family="Times,serif" 
font-size="14.00">0..N</text>
 <text text-anchor="start" x="1198.1" y="-835.63" font-family="Times,serif" 
font-size="14.00">1</text>
 </g>
 <!-- task_instance&#45;&#45;xcom -->
diff --git a/docs/apache-airflow/migrations-ref.rst 
b/docs/apache-airflow/migrations-ref.rst
index 6d60470a01..226bcb53fc 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
 | Revision ID                     | Revises ID        | Airflow Version   | 
Description                                                  |
 
+=================================+===================+===================+==============================================================+
-| ``bff083ad727d`` (head)         | ``1949afb29106``  | ``2.9.2``         | 
Remove ``idx_last_scheduling_decision`` index on             |
+| ``686269002441`` (head)         | ``bff083ad727d``  | ``2.9.2``         | 
Fix inconsistency between ORM and migration files.           |
++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
+| ``bff083ad727d``                | ``1949afb29106``  | ``2.9.2``         | 
Remove ``idx_last_scheduling_decision`` index on             |
 |                                 |                   |                   | 
last_scheduling_decision in dag_run table                    |
 
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
 | ``1949afb29106``                | ``ee1467d4aa35``  | ``2.9.0``         | 
update trigger kwargs type and encrypt                       |
diff --git a/tests/cli/commands/test_db_command.py 
b/tests/cli/commands/test_db_command.py
index 60fd9ed7a0..022744c89d 100644
--- a/tests/cli/commands/test_db_command.py
+++ b/tests/cli/commands/test_db_command.py
@@ -46,12 +46,17 @@ class TestCliDb:
     def test_cli_resetdb(self, mock_resetdb):
         db_command.resetdb(self.parser.parse_args(["db", "reset", "--yes"]))
 
-        mock_resetdb.assert_called_once_with(skip_init=False)
+        mock_resetdb.assert_called_once_with(skip_init=False, 
use_migration_files=False)
 
     @mock.patch("airflow.cli.commands.db_command.db.resetdb")
     def test_cli_resetdb_skip_init(self, mock_resetdb):
         db_command.resetdb(self.parser.parse_args(["db", "reset", "--yes", 
"--skip-init"]))
-        mock_resetdb.assert_called_once_with(skip_init=True)
+        mock_resetdb.assert_called_once_with(skip_init=True, 
use_migration_files=False)
+
+    @mock.patch("airflow.cli.commands.db_command.db.resetdb")
+    def test_cli_resetdb_use_migration_files(self, mock_resetdb):
+        db_command.resetdb(self.parser.parse_args(["db", "reset", "--yes", 
"--use-migration-files"]))
+        mock_resetdb.assert_called_once_with(skip_init=False, 
use_migration_files=True)
 
     @mock.patch("airflow.cli.commands.db_command.db.check_migrations")
     def test_cli_check_migrations(self, mock_wait_for_migrations):
@@ -62,36 +67,73 @@ class TestCliDb:
     @pytest.mark.parametrize(
         "args, called_with",
         [
-            ([], dict(to_revision=None, from_revision=None, 
show_sql_only=False)),
-            (["--show-sql-only"], dict(to_revision=None, from_revision=None, 
show_sql_only=True)),
-            (["--to-revision", "abc"], dict(to_revision="abc", 
from_revision=None, show_sql_only=False)),
+            ([], dict(to_revision=None, from_revision=None, 
show_sql_only=False, use_migration_files=False)),
+            (
+                ["--show-sql-only"],
+                dict(to_revision=None, from_revision=None, show_sql_only=True, 
use_migration_files=False),
+            ),
+            (
+                ["--to-revision", "abc"],
+                dict(to_revision="abc", from_revision=None, 
show_sql_only=False, use_migration_files=False),
+            ),
             (
                 ["--to-revision", "abc", "--show-sql-only"],
-                dict(to_revision="abc", from_revision=None, 
show_sql_only=True),
+                dict(to_revision="abc", from_revision=None, 
show_sql_only=True, use_migration_files=False),
             ),
             (
                 ["--to-version", "2.2.2"],
-                dict(to_revision="7b2661a43ba3", from_revision=None, 
show_sql_only=False),
+                dict(
+                    to_revision="7b2661a43ba3",
+                    from_revision=None,
+                    show_sql_only=False,
+                    use_migration_files=False,
+                ),
             ),
             (
                 ["--to-version", "2.2.2", "--show-sql-only"],
-                dict(to_revision="7b2661a43ba3", from_revision=None, 
show_sql_only=True),
+                dict(
+                    to_revision="7b2661a43ba3",
+                    from_revision=None,
+                    show_sql_only=True,
+                    use_migration_files=False,
+                ),
             ),
             (
                 ["--to-revision", "abc", "--from-revision", "abc123", 
"--show-sql-only"],
-                dict(to_revision="abc", from_revision="abc123", 
show_sql_only=True),
+                dict(
+                    to_revision="abc", from_revision="abc123", 
show_sql_only=True, use_migration_files=False
+                ),
             ),
             (
                 ["--to-revision", "abc", "--from-version", "2.2.2", 
"--show-sql-only"],
-                dict(to_revision="abc", from_revision="7b2661a43ba3", 
show_sql_only=True),
+                dict(
+                    to_revision="abc",
+                    from_revision="7b2661a43ba3",
+                    show_sql_only=True,
+                    use_migration_files=False,
+                ),
             ),
             (
                 ["--to-version", "2.2.4", "--from-revision", "abc123", 
"--show-sql-only"],
-                dict(to_revision="587bdf053233", from_revision="abc123", 
show_sql_only=True),
+                dict(
+                    to_revision="587bdf053233",
+                    from_revision="abc123",
+                    show_sql_only=True,
+                    use_migration_files=False,
+                ),
             ),
             (
                 ["--to-version", "2.2.4", "--from-version", "2.2.2", 
"--show-sql-only"],
-                dict(to_revision="587bdf053233", from_revision="7b2661a43ba3", 
show_sql_only=True),
+                dict(
+                    to_revision="587bdf053233",
+                    from_revision="7b2661a43ba3",
+                    show_sql_only=True,
+                    use_migration_files=False,
+                ),
+            ),
+            (
+                ["--use-migration-files", "--show-sql-only"],
+                dict(to_revision=None, from_revision=None, 
use_migration_files=True, show_sql_only=True),
             ),
         ],
     )
diff --git a/tests/conftest.py b/tests/conftest.py
index b6dcaf47af..290469c9b4 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -311,7 +311,7 @@ def initial_db_init():
     from airflow.www.extensions.init_appbuilder import init_appbuilder
     from airflow.www.extensions.init_auth_manager import get_auth_manager
 
-    db.resetdb()
+    db.resetdb(use_migration_files=True)
     db.bootstrap_dagbag()
     # minimal app to add roles
     flask_app = Flask(__name__)
diff --git a/tests/system/core/example_external_task_child_deferrable.py 
b/tests/system/core/example_external_task_child_deferrable.py 
similarity index 100%
rename from tests/system/core/example_external_task_child_deferrable.py
rename to tests/system/core/example_external_task_child_deferrable.py 
diff --git a/tests/utils/test_db.py b/tests/utils/test_db.py
index 657f5fbc7c..0b4347f589 100644
--- a/tests/utils/test_db.py
+++ b/tests/utils/test_db.py
@@ -84,6 +84,7 @@ class TestDb:
             # Ignore flask-session table/index
             lambda t: (t[0] == "remove_table" and t[1].name == "session"),
             lambda t: (t[0] == "remove_index" and t[1].name == "session_id"),
+            lambda t: (t[0] == "remove_index" and t[1].name == 
"session_session_id_uq"),
             # sqlite sequence is used for autoincrementing columns created 
with `sqlite_autoincrement` option
             lambda t: (t[0] == "remove_table" and t[1].name == 
"sqlite_sequence"),
         ]
@@ -233,7 +234,7 @@ class TestDb:
         if skip_init:
             mock_init.assert_not_called()
         else:
-            mock_init.assert_called_once_with(session=session_mock)
+            mock_init.assert_called_once_with(session=session_mock, 
use_migration_files=False)
 
     def test_alembic_configuration(self):
         with mock.patch.dict(

Reply via email to