This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7efdeed5ec Add missing AUTOINC/SERIAL for FAB tables (#26885)
7efdeed5ec is described below
commit 7efdeed5eccbf5cb709af40c8c66757e59c957ed
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Fri Oct 7 17:37:55 2022 +0100
Add missing AUTOINC/SERIAL for FAB tables (#26885)
* Add missing AUTOINC/SERIAL for FAB tables
In 1.10.13 we introduced a migration that creates the tables with the
server_default but that migration only did anything if the tables didn't
already exist. But the tables created by the FAB model have a default
(but not a server_default).
Oh, and the final bit of the puzzle, in 2.4 we finally "took control" of
the FAB security models in to airflow and those do not have the default
set.
* Update airflow/migrations/versions/0118_2_4_2_add_missing_autoinc_fab.py
* Fix static checks
* Run migrations with with a pool of a connection.
Without this `create_session()` will open a new connection, and that causes
mysql
to hang waiting to get a "metadata lock on table".
Using the "stock" pool with size=1 and max_overflow=0 doesn't work, that
instead times out if you try to get a new connection from the pool.
SingletonThreadPool instead returns the existing active connection which
is what we want.
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow/migrations/env.py | 8 ++-
.../versions/0118_2_4_2_add_missing_autoinc_fab.py | 78 ++++++++++++++++++++++
... 0119_2_5_0_add_updated_at_to_dagrun_and_ti.py} | 4 +-
airflow/settings.py | 15 +++--
airflow/utils/db.py | 17 ++++-
docs/apache-airflow/img/airflow_erd.sha256 | 2 +-
docs/apache-airflow/migrations-ref.rst | 4 +-
7 files changed, 115 insertions(+), 13 deletions(-)
diff --git a/airflow/migrations/env.py b/airflow/migrations/env.py
index 6474f0799c..9dcd29e9ca 100644
--- a/airflow/migrations/env.py
+++ b/airflow/migrations/env.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+import contextlib
from logging.config import fileConfig
from alembic import context
@@ -89,9 +90,12 @@ def run_migrations_online():
and associate a connection with the context.
"""
- connectable = settings.engine
+ with contextlib.ExitStack() as stack:
+ connection = config.attributes.get('connection', None)
+
+ if not connection:
+ connection = stack.push(settings.engine.connect())
- with connectable.connect() as connection:
context.configure(
connection=connection,
transaction_per_migration=True,
diff --git a/airflow/migrations/versions/0118_2_4_2_add_missing_autoinc_fab.py
b/airflow/migrations/versions/0118_2_4_2_add_missing_autoinc_fab.py
new file mode 100644
index 0000000000..f6becd9dfe
--- /dev/null
+++ b/airflow/migrations/versions/0118_2_4_2_add_missing_autoinc_fab.py
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add missing auto-increment to columns on FAB tables
+
+Revision ID: b0d31815b5a6
+Revises: ecb43d2a1842
+Create Date: 2022-10-05 13:16:45.638490
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = 'b0d31815b5a6'
+down_revision = 'ecb43d2a1842'
+branch_labels = None
+depends_on = None
+airflow_version = '2.4.2'
+
+
+def upgrade():
+ """Apply migration.
+
+ If these columns are already of the right type (i.e. created by our
+ migration in 1.10.13 rather than FAB itself in an earlier version), this
+ migration will issue an alter statement to change them to what they already
+ are -- i.e. its a no-op.
+
+ These tables are small (100 to low 1k rows at most), so it's not too costly
+ to change them.
+ """
+ conn = op.get_bind()
+ if conn.dialect.name in ['mssql', 'sqlite']:
+ # 1.10.12 didn't support SQL Server, so it couldn't have gotten this
wrong --> nothing to correct
+ # SQLite autoinc was "implicit" for an INTEGER NOT NULL PRIMARY KEY
+ return
+
+ for table in (
+ 'ab_permission',
+ 'ab_view_menu',
+ 'ab_role',
+ 'ab_permission_view',
+ 'ab_permission_view_role',
+ 'ab_user',
+ 'ab_user_role',
+ 'ab_register_user',
+ ):
+ with op.batch_alter_table(table) as batch:
+ kwargs = {}
+ if conn.dialect.name == 'postgresql':
+ kwargs['type_'] = sa.Sequence(f'{table}_id_seq').next_value()
+ else:
+ kwargs['autoincrement'] = True
+ batch.alter_column("id", existing_type=sa.Integer(),
existing_nullable=False, **kwargs)
+
+
+def downgrade():
+ """Unapply add_missing_autoinc_fab"""
+ # No downgrade needed, these _should_ have applied from 1.10.13 but didn't
due to a previous bug!
diff --git
a/airflow/migrations/versions/0118_2_5_0_add_updated_at_to_dagrun_and_ti.py
b/airflow/migrations/versions/0119_2_5_0_add_updated_at_to_dagrun_and_ti.py
similarity index 97%
rename from
airflow/migrations/versions/0118_2_5_0_add_updated_at_to_dagrun_and_ti.py
rename to
airflow/migrations/versions/0119_2_5_0_add_updated_at_to_dagrun_and_ti.py
index fc58052965..2e249b9233 100644
--- a/airflow/migrations/versions/0118_2_5_0_add_updated_at_to_dagrun_and_ti.py
+++ b/airflow/migrations/versions/0119_2_5_0_add_updated_at_to_dagrun_and_ti.py
@@ -19,7 +19,7 @@
"""Add updated_at column to DagRun and TaskInstance
Revision ID: ee8d93fcc81e
-Revises: ecb43d2a1842
+Revises: b0d31815b5a6
Create Date: 2022-09-08 19:08:37.623121
"""
@@ -33,7 +33,7 @@ from airflow.migrations.db_types import TIMESTAMP
# revision identifiers, used by Alembic.
revision = 'ee8d93fcc81e'
-down_revision = 'ecb43d2a1842'
+down_revision = 'b0d31815b5a6'
branch_labels = None
depends_on = None
airflow_version = '2.5.0'
diff --git a/airflow/settings.py b/airflow/settings.py
index 10e963d5f7..696b4b652a 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -259,14 +259,14 @@ def configure_vars():
DONOT_MODIFY_HANDLERS = conf.getboolean('logging',
'donot_modify_handlers', fallback=False)
-def configure_orm(disable_connection_pool=False):
+def configure_orm(disable_connection_pool=False, pool_class=None):
"""Configure ORM using SQLAlchemy"""
from airflow.utils.log.secrets_masker import mask_secret
log.debug("Setting up DB connection pool (PID %s)", os.getpid())
global engine
global Session
- engine_args = prepare_engine_args(disable_connection_pool)
+ engine_args = prepare_engine_args(disable_connection_pool, pool_class)
if conf.has_option('database', 'sql_alchemy_connect_args'):
connect_args = conf.getimport('database', 'sql_alchemy_connect_args')
@@ -319,7 +319,7 @@ DEFAULT_ENGINE_ARGS = {
}
-def prepare_engine_args(disable_connection_pool=False):
+def prepare_engine_args(disable_connection_pool=False, pool_class=None):
"""Prepare SQLAlchemy engine args"""
default_args = {}
for dialect, default in DEFAULT_ENGINE_ARGS.items():
@@ -331,7 +331,10 @@ def prepare_engine_args(disable_connection_pool=False):
'database', 'sql_alchemy_engine_args', fallback=default_args
) # type: ignore
- if disable_connection_pool or not conf.getboolean('database',
'SQL_ALCHEMY_POOL_ENABLED'):
+ if pool_class:
+ # Don't use separate settings for size etc, only those from
sql_alchemy_engine_args
+ engine_args['poolclass'] = pool_class
+ elif disable_connection_pool or not conf.getboolean('database',
'SQL_ALCHEMY_POOL_ENABLED'):
engine_args['poolclass'] = NullPool
log.debug("settings.prepare_engine_args(): Using NullPool")
elif not SQL_ALCHEMY_CONN.startswith('sqlite'):
@@ -413,10 +416,10 @@ def dispose_orm():
engine = None
-def reconfigure_orm(disable_connection_pool=False):
+def reconfigure_orm(disable_connection_pool=False, pool_class=None):
"""Properly close database connections and re-configure ORM"""
dispose_orm()
- configure_orm(disable_connection_pool=disable_connection_pool)
+ configure_orm(disable_connection_pool=disable_connection_pool,
pool_class=pool_class)
def configure_adapters():
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index e6dfcf9f2e..f796156f8f 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -1539,8 +1539,23 @@ def upgradedb(
initdb(session=session, load_connections=False)
return
with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
+ import sqlalchemy.pool
+
log.info("Creating tables")
- command.upgrade(config, revision=to_revision or 'heads')
+ val = os.environ.get('AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE')
+ try:
+ # Reconfigure the ORM ot use _EXACTLY_ one connection, otherwise
some db engines hang forever
+ # trying to ALTER TABLEs
+ os.environ['AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE'] = '1'
+
settings.reconfigure_orm(pool_class=sqlalchemy.pool.SingletonThreadPool)
+ command.upgrade(config, revision=to_revision or 'heads')
+ finally:
+ if val is None:
+ os.environ.pop('AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE')
+ else:
+ os.environ['AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE'] = val
+ settings.reconfigure_orm()
+
reserialize_dags(session=session)
add_default_pool_if_not_exists(session=session)
synchronize_log_template(session=session)
diff --git a/docs/apache-airflow/img/airflow_erd.sha256
b/docs/apache-airflow/img/airflow_erd.sha256
index efbf555b9e..26f6e90fbf 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-f15522e5fdd4bc7ddd43b5f9e5ac73a7fd408bf976a23715603463718a1aecf3
\ No newline at end of file
+5d0b9bcb02f09e99338b2c230cf2c7b1e8af7f7ea675eca6f31e49e851e11941
\ No newline at end of file
diff --git a/docs/apache-airflow/migrations-ref.rst
b/docs/apache-airflow/migrations-ref.rst
index 3b92588ba0..f09052ca7b 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are
executed via when you ru
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version |
Description |
+=================================+===================+===================+==============================================================+
-| ``ee8d93fcc81e`` (head) | ``ecb43d2a1842`` | ``2.5.0`` |
Add updated_at column to DagRun and TaskInstance |
+| ``ee8d93fcc81e`` (head) | ``b0d31815b5a6`` | ``2.5.0`` |
Add updated_at column to DagRun and TaskInstance |
++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
+| ``b0d31815b5a6`` | ``ecb43d2a1842`` | ``2.4.2`` |
Add missing auto-increment to columns on FAB tables |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``ecb43d2a1842`` | ``1486deb605b4`` | ``2.4.0`` |
Add processor_subdir column to DagModel, SerializedDagModel |
| | | |
and CallbackRequest tables. |