This is an automated email from the ASF dual-hosted git repository.
michaelsmolina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git
The following commit(s) were added to refs/heads/master by this push:
new bac61fc24e chore: Allow auto pruning of the query table (#29936)
bac61fc24e is described below
commit bac61fc24ecee952a34e46c1ecda92b161f17b67
Author: Michael S. Molina <[email protected]>
AuthorDate: Mon Aug 19 07:05:18 2024 -0400
chore: Allow auto pruning of the query table (#29936)
---
superset/commands/sql_lab/query.py | 109 +++++++++++++++++++++++++++++++++++++
superset/config.py | 6 ++
superset/tasks/scheduler.py | 14 +++++
3 files changed, 129 insertions(+)
diff --git a/superset/commands/sql_lab/query.py
b/superset/commands/sql_lab/query.py
new file mode 100644
index 0000000000..87bc0d28ad
--- /dev/null
+++ b/superset/commands/sql_lab/query.py
@@ -0,0 +1,109 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import time
+from datetime import datetime, timedelta
+
+import sqlalchemy as sa
+
+from superset import db
+from superset.commands.base import BaseCommand
+from superset.models.sql_lab import Query
+
+logger = logging.getLogger(__name__)
+
+
+# pylint: disable=consider-using-transaction
+class QueryPruneCommand(BaseCommand):
+ """
+ Command to prune the query table by deleting rows older than the specified
retention period.
+
+ This command deletes records from the `Query` table that have not been
changed within the
+ specified number of days. It helps in maintaining the database by removing
outdated entries
+ and freeing up space.
+
+ Attributes:
+ retention_period_days (int): The number of days for which records
should be retained.
+ Records older than this period will be
deleted.
+ """
+
+ def __init__(self, retention_period_days: int):
+ """
+ :param retention_period_days: Number of days to keep in the query table
+ """
+ self.retention_period_days = retention_period_days
+
+ def run(self) -> None:
+ """
+ Executes the prune command
+ """
+ batch_size = 999 # SQLite has a IN clause limit of 999
+ total_deleted = 0
+ start_time = time.time()
+
+ # Select all IDs that need to be deleted
+ ids_to_delete = (
+ db.session.execute(
+ sa.select(Query.id).where(
+ Query.changed_on
+ < datetime.now() -
timedelta(days=self.retention_period_days)
+ )
+ )
+ .scalars()
+ .all()
+ )
+
+ total_rows = len(ids_to_delete)
+
+ logger.info("Total rows to be deleted: %s", total_rows)
+
+ next_logging_threshold = 1
+
+ # Iterate over the IDs in batches
+ for i in range(0, total_rows, batch_size):
+ batch_ids = ids_to_delete[i : i + batch_size]
+
+ # Delete the selected batch using IN clause
+ result =
db.session.execute(sa.delete(Query).where(Query.id.in_(batch_ids)))
+
+ # Update the total number of deleted records
+ total_deleted += result.rowcount
+
+ # Explicitly commit the transaction given that if an error occurs,
we want to ensure that the
+ # records that have been deleted so far are committed
+ db.session.commit()
+
+ # Log the number of deleted records every 1% increase in progress
+ percentage_complete = (total_deleted / total_rows) * 100
+ if percentage_complete >= next_logging_threshold:
+ logger.info(
+ "Deleted %s rows from the query table older than %s days
(%d%% complete)",
+ total_deleted,
+ self.retention_period_days,
+ percentage_complete,
+ )
+ next_logging_threshold += 1
+
+ elapsed_time = time.time() - start_time
+ minutes, seconds = divmod(elapsed_time, 60)
+ formatted_time = f"{int(minutes):02}:{int(seconds):02}"
+ logger.info(
+ "Pruning complete: %s rows deleted in %s", total_deleted,
formatted_time
+ )
+
+ def validate(self) -> None:
+ pass
diff --git a/superset/config.py b/superset/config.py
index 2fa8ef72ba..5b30397a0a 100644
--- a/superset/config.py
+++ b/superset/config.py
@@ -1000,6 +1000,12 @@ class CeleryConfig: # pylint:
disable=too-few-public-methods
"task": "reports.prune_log",
"schedule": crontab(minute=0, hour=0),
},
+ # Uncomment to enable pruning of the query table
+ # "prune_query": {
+ # "task": "prune_query",
+ # "schedule": crontab(minute=0, hour=0, day_of_month=1),
+ # "options": {"retention_period_days": 180},
+ # },
}
diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py
index 652d556e89..ce963eff64 100644
--- a/superset/tasks/scheduler.py
+++ b/superset/tasks/scheduler.py
@@ -25,6 +25,7 @@ from superset.commands.exceptions import CommandException
from superset.commands.report.exceptions import ReportScheduleUnexpectedError
from superset.commands.report.execute import AsyncExecuteReportScheduleCommand
from superset.commands.report.log_prune import
AsyncPruneReportScheduleLogCommand
+from superset.commands.sql_lab.query import QueryPruneCommand
from superset.daos.report import ReportScheduleDAO
from superset.extensions import celery_app
from superset.stats_logger import BaseStatsLogger
@@ -119,3 +120,16 @@ def prune_log() -> None:
logger.warning("A timeout occurred while pruning report schedule logs:
%s", ex)
except CommandException:
logger.exception("An exception occurred while pruning report schedule
logs")
+
+
+@celery_app.task(name="prune_query")
+def prune_query() -> None:
+ stats_logger: BaseStatsLogger = app.config["STATS_LOGGER"]
+ stats_logger.incr("prune_query")
+
+ try:
+ QueryPruneCommand(
+ prune_query.request.properties.get("retention_period_days")
+ ).run()
+ except CommandException as ex:
+ logger.exception("An error occurred while pruning queries: %s", ex)