This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 79722e3dee Fernet-key-rotation-optimisation (#40786)
79722e3dee is described below
commit 79722e3deeffdd4d61c6e5f377baa6c4d2296fc5
Author: Bartosz Jankiewicz <[email protected]>
AuthorDate: Wed Jul 17 17:50:37 2024 +0200
Fernet-key-rotation-optimisation (#40786)
* Current implementation of Fernet key rotation implicitly executes `all()`
method on the processed tables leading to loading all rows to memory.
It's been observed that some users store additional data in `variable`
table which is leading to memory issues during the operation.
This change introduces batch processing of fernet key rotation to avoid it.
To be consistent across the tables (`variable`, `connection`, `trigger`) the
batching operation was added for all of them.
---------
Co-authored-by: bjankiewicz <[email protected]>
---
airflow/cli/commands/rotate_fernet_key_command.py | 54 +++++++++++++++++++----
1 file changed, 46 insertions(+), 8 deletions(-)
diff --git a/airflow/cli/commands/rotate_fernet_key_command.py
b/airflow/cli/commands/rotate_fernet_key_command.py
index f4c3261573..dc2ade361c 100644
--- a/airflow/cli/commands/rotate_fernet_key_command.py
+++ b/airflow/cli/commands/rotate_fernet_key_command.py
@@ -24,17 +24,55 @@ from airflow.models import Connection, Trigger, Variable
from airflow.utils import cli as cli_utils
from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
from airflow.utils.session import create_session
+from airflow.utils.sqlalchemy import is_sqlalchemy_v1
@cli_utils.action_cli
@providers_configuration_loaded
def rotate_fernet_key(args):
- """Rotates all encrypted connection credentials and variables."""
+ """Rotates all encrypted connection credentials, triggers and variables."""
+ batch_size = 100
+ rotate_method = rotate_items_in_batches_v1 if is_sqlalchemy_v1() else
rotate_items_in_batches_v2
with create_session() as session:
- conns_query = select(Connection).where(Connection.is_encrypted |
Connection.is_extra_encrypted)
- for conn in session.scalars(conns_query):
- conn.rotate_fernet_key()
- for var in
session.scalars(select(Variable).where(Variable.is_encrypted)):
- var.rotate_fernet_key()
- for trigger in session.scalars(select(Trigger)):
- trigger.rotate_fernet_key()
+ with session.begin(): # Start a single transaction
+ rotate_method(
+ session,
+ Connection,
+ filter_condition=Connection.is_encrypted |
Connection.is_extra_encrypted,
+ batch_size=batch_size,
+ )
+ rotate_method(session, Variable,
filter_condition=Variable.is_encrypted, batch_size=batch_size)
+ rotate_method(session, Trigger, filter_condition=None,
batch_size=batch_size)
+
+
+def rotate_items_in_batches_v1(session, model_class, filter_condition=None,
batch_size=100):
+ """Rotates Fernet keys for items of a given model in batches to avoid
excessive memory usage.
+
+ This function is a replacement for yield_per, which is not available in
SQLAlchemy 1.x.
+ """
+ offset = 0
+ while True:
+ query = select(model_class)
+ if filter_condition is not None:
+ query = query.where(filter_condition)
+ query = query.offset(offset).limit(batch_size)
+ items = session.scalars(query).all()
+ if not items:
+ break # No more items to process
+ for item in items:
+ item.rotate_fernet_key()
+ offset += batch_size
+
+
+def rotate_items_in_batches_v2(session, model_class, filter_condition=None,
batch_size=100):
+ """Rotates Fernet keys for items of a given model in batches to avoid
excessive memory usage.
+
+ This function is taking advantage of yield_per available in SQLAlchemy 2.x.
+ """
+ while True:
+ query = select(model_class)
+ if filter_condition is not None:
+ query = query.where(filter_condition)
+ items = session.scalars(query).yield_per(batch_size)
+ for item in items:
+ item.rotate_fernet_key()