This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7efdeed5ec Add missing AUTOINC/SERIAL for FAB tables (#26885)
7efdeed5ec is described below

commit 7efdeed5eccbf5cb709af40c8c66757e59c957ed
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Fri Oct 7 17:37:55 2022 +0100

    Add missing AUTOINC/SERIAL for FAB tables (#26885)
    
    * Add missing AUTOINC/SERIAL for FAB tables
    
    In 1.10.13 we introduced a migration that creates the tables with the
    server_default but that migration only did anything if the tables didn't
    already exist. But the tables created by the FAB model have a default
    (but not a server_default).
    
    Oh, and the final bit of the puzzle, in 2.4 we finally "took control" of
    the FAB security models in to airflow and those do not have the default
    set.
    
    * Update airflow/migrations/versions/0118_2_4_2_add_missing_autoinc_fab.py
    
    * Fix static checks
    
    * Run migrations with with a pool of a connection.
    
    Without this `create_session()` will open a new connection, and that causes 
mysql
    to hang waiting to get a "metadata lock on table".
    
    Using the "stock" pool with size=1 and max_overflow=0 doesn't work, that
    instead times out if you try to get a new connection from the pool.
    SingletonThreadPool instead returns the existing active connection which
    is what we want.
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
---
 airflow/migrations/env.py                          |  8 ++-
 .../versions/0118_2_4_2_add_missing_autoinc_fab.py | 78 ++++++++++++++++++++++
 ... 0119_2_5_0_add_updated_at_to_dagrun_and_ti.py} |  4 +-
 airflow/settings.py                                | 15 +++--
 airflow/utils/db.py                                | 17 ++++-
 docs/apache-airflow/img/airflow_erd.sha256         |  2 +-
 docs/apache-airflow/migrations-ref.rst             |  4 +-
 7 files changed, 115 insertions(+), 13 deletions(-)

diff --git a/airflow/migrations/env.py b/airflow/migrations/env.py
index 6474f0799c..9dcd29e9ca 100644
--- a/airflow/migrations/env.py
+++ b/airflow/migrations/env.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+import contextlib
 from logging.config import fileConfig
 
 from alembic import context
@@ -89,9 +90,12 @@ def run_migrations_online():
     and associate a connection with the context.
 
     """
-    connectable = settings.engine
+    with contextlib.ExitStack() as stack:
+        connection = config.attributes.get('connection', None)
+
+        if not connection:
+            connection = stack.push(settings.engine.connect())
 
-    with connectable.connect() as connection:
         context.configure(
             connection=connection,
             transaction_per_migration=True,
diff --git a/airflow/migrations/versions/0118_2_4_2_add_missing_autoinc_fab.py 
b/airflow/migrations/versions/0118_2_4_2_add_missing_autoinc_fab.py
new file mode 100644
index 0000000000..f6becd9dfe
--- /dev/null
+++ b/airflow/migrations/versions/0118_2_4_2_add_missing_autoinc_fab.py
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add missing auto-increment to columns on FAB tables
+
+Revision ID: b0d31815b5a6
+Revises: ecb43d2a1842
+Create Date: 2022-10-05 13:16:45.638490
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = 'b0d31815b5a6'
+down_revision = 'ecb43d2a1842'
+branch_labels = None
+depends_on = None
+airflow_version = '2.4.2'
+
+
+def upgrade():
+    """Apply migration.
+
+    If these columns are already of the right type (i.e. created by our
+    migration in 1.10.13 rather than FAB itself in an earlier version), this
+    migration will issue an alter statement to change them to what they already
+    are -- i.e. its a no-op.
+
+    These tables are small (100 to low 1k rows at most), so it's not too costly
+    to change them.
+    """
+    conn = op.get_bind()
+    if conn.dialect.name in ['mssql', 'sqlite']:
+        # 1.10.12 didn't support SQL Server, so it couldn't have gotten this 
wrong --> nothing to correct
+        # SQLite autoinc was "implicit" for an INTEGER NOT NULL PRIMARY KEY
+        return
+
+    for table in (
+        'ab_permission',
+        'ab_view_menu',
+        'ab_role',
+        'ab_permission_view',
+        'ab_permission_view_role',
+        'ab_user',
+        'ab_user_role',
+        'ab_register_user',
+    ):
+        with op.batch_alter_table(table) as batch:
+            kwargs = {}
+            if conn.dialect.name == 'postgresql':
+                kwargs['type_'] = sa.Sequence(f'{table}_id_seq').next_value()
+            else:
+                kwargs['autoincrement'] = True
+            batch.alter_column("id", existing_type=sa.Integer(), 
existing_nullable=False, **kwargs)
+
+
+def downgrade():
+    """Unapply add_missing_autoinc_fab"""
+    # No downgrade needed, these _should_ have applied from 1.10.13 but didn't 
due to a previous bug!
diff --git 
a/airflow/migrations/versions/0118_2_5_0_add_updated_at_to_dagrun_and_ti.py 
b/airflow/migrations/versions/0119_2_5_0_add_updated_at_to_dagrun_and_ti.py
similarity index 97%
rename from 
airflow/migrations/versions/0118_2_5_0_add_updated_at_to_dagrun_and_ti.py
rename to 
airflow/migrations/versions/0119_2_5_0_add_updated_at_to_dagrun_and_ti.py
index fc58052965..2e249b9233 100644
--- a/airflow/migrations/versions/0118_2_5_0_add_updated_at_to_dagrun_and_ti.py
+++ b/airflow/migrations/versions/0119_2_5_0_add_updated_at_to_dagrun_and_ti.py
@@ -19,7 +19,7 @@
 """Add updated_at column to DagRun and TaskInstance
 
 Revision ID: ee8d93fcc81e
-Revises: ecb43d2a1842
+Revises: b0d31815b5a6
 Create Date: 2022-09-08 19:08:37.623121
 
 """
@@ -33,7 +33,7 @@ from airflow.migrations.db_types import TIMESTAMP
 
 # revision identifiers, used by Alembic.
 revision = 'ee8d93fcc81e'
-down_revision = 'ecb43d2a1842'
+down_revision = 'b0d31815b5a6'
 branch_labels = None
 depends_on = None
 airflow_version = '2.5.0'
diff --git a/airflow/settings.py b/airflow/settings.py
index 10e963d5f7..696b4b652a 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -259,14 +259,14 @@ def configure_vars():
     DONOT_MODIFY_HANDLERS = conf.getboolean('logging', 
'donot_modify_handlers', fallback=False)
 
 
-def configure_orm(disable_connection_pool=False):
+def configure_orm(disable_connection_pool=False, pool_class=None):
     """Configure ORM using SQLAlchemy"""
     from airflow.utils.log.secrets_masker import mask_secret
 
     log.debug("Setting up DB connection pool (PID %s)", os.getpid())
     global engine
     global Session
-    engine_args = prepare_engine_args(disable_connection_pool)
+    engine_args = prepare_engine_args(disable_connection_pool, pool_class)
 
     if conf.has_option('database', 'sql_alchemy_connect_args'):
         connect_args = conf.getimport('database', 'sql_alchemy_connect_args')
@@ -319,7 +319,7 @@ DEFAULT_ENGINE_ARGS = {
 }
 
 
-def prepare_engine_args(disable_connection_pool=False):
+def prepare_engine_args(disable_connection_pool=False, pool_class=None):
     """Prepare SQLAlchemy engine args"""
     default_args = {}
     for dialect, default in DEFAULT_ENGINE_ARGS.items():
@@ -331,7 +331,10 @@ def prepare_engine_args(disable_connection_pool=False):
         'database', 'sql_alchemy_engine_args', fallback=default_args
     )  # type: ignore
 
-    if disable_connection_pool or not conf.getboolean('database', 
'SQL_ALCHEMY_POOL_ENABLED'):
+    if pool_class:
+        # Don't use separate settings for size etc, only those from 
sql_alchemy_engine_args
+        engine_args['poolclass'] = pool_class
+    elif disable_connection_pool or not conf.getboolean('database', 
'SQL_ALCHEMY_POOL_ENABLED'):
         engine_args['poolclass'] = NullPool
         log.debug("settings.prepare_engine_args(): Using NullPool")
     elif not SQL_ALCHEMY_CONN.startswith('sqlite'):
@@ -413,10 +416,10 @@ def dispose_orm():
         engine = None
 
 
-def reconfigure_orm(disable_connection_pool=False):
+def reconfigure_orm(disable_connection_pool=False, pool_class=None):
     """Properly close database connections and re-configure ORM"""
     dispose_orm()
-    configure_orm(disable_connection_pool=disable_connection_pool)
+    configure_orm(disable_connection_pool=disable_connection_pool, 
pool_class=pool_class)
 
 
 def configure_adapters():
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index e6dfcf9f2e..f796156f8f 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -1539,8 +1539,23 @@ def upgradedb(
         initdb(session=session, load_connections=False)
         return
     with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
+        import sqlalchemy.pool
+
         log.info("Creating tables")
-        command.upgrade(config, revision=to_revision or 'heads')
+        val = os.environ.get('AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE')
+        try:
+            # Reconfigure the ORM ot use _EXACTLY_ one connection, otherwise 
some db engines hang forever
+            # trying to ALTER TABLEs
+            os.environ['AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE'] = '1'
+            
settings.reconfigure_orm(pool_class=sqlalchemy.pool.SingletonThreadPool)
+            command.upgrade(config, revision=to_revision or 'heads')
+        finally:
+            if val is None:
+                os.environ.pop('AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE')
+            else:
+                os.environ['AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE'] = val
+            settings.reconfigure_orm()
+
     reserialize_dags(session=session)
     add_default_pool_if_not_exists(session=session)
     synchronize_log_template(session=session)
diff --git a/docs/apache-airflow/img/airflow_erd.sha256 
b/docs/apache-airflow/img/airflow_erd.sha256
index efbf555b9e..26f6e90fbf 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-f15522e5fdd4bc7ddd43b5f9e5ac73a7fd408bf976a23715603463718a1aecf3
\ No newline at end of file
+5d0b9bcb02f09e99338b2c230cf2c7b1e8af7f7ea675eca6f31e49e851e11941
\ No newline at end of file
diff --git a/docs/apache-airflow/migrations-ref.rst 
b/docs/apache-airflow/migrations-ref.rst
index 3b92588ba0..f09052ca7b 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
 | Revision ID                     | Revises ID        | Airflow Version   | 
Description                                                  |
 
+=================================+===================+===================+==============================================================+
-| ``ee8d93fcc81e`` (head)         | ``ecb43d2a1842``  | ``2.5.0``         | 
Add updated_at column to DagRun and TaskInstance             |
+| ``ee8d93fcc81e`` (head)         | ``b0d31815b5a6``  | ``2.5.0``         | 
Add updated_at column to DagRun and TaskInstance             |
++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
+| ``b0d31815b5a6``                | ``ecb43d2a1842``  | ``2.4.2``         | 
Add missing auto-increment to columns on FAB tables          |
 
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
 | ``ecb43d2a1842``                | ``1486deb605b4``  | ``2.4.0``         | 
Add processor_subdir column to DagModel, SerializedDagModel  |
 |                                 |                   |                   | 
and CallbackRequest tables.                                  |

Reply via email to