This is an automated email from the ASF dual-hosted git repository.

michaelsmolina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git


The following commit(s) were added to refs/heads/master by this push:
     new bac61fc24e chore: Allow auto pruning of the query table (#29936)
bac61fc24e is described below

commit bac61fc24ecee952a34e46c1ecda92b161f17b67
Author: Michael S. Molina <[email protected]>
AuthorDate: Mon Aug 19 07:05:18 2024 -0400

    chore: Allow auto pruning of the query table (#29936)
---
 superset/commands/sql_lab/query.py | 109 +++++++++++++++++++++++++++++++++++++
 superset/config.py                 |   6 ++
 superset/tasks/scheduler.py        |  14 +++++
 3 files changed, 129 insertions(+)

diff --git a/superset/commands/sql_lab/query.py 
b/superset/commands/sql_lab/query.py
new file mode 100644
index 0000000000..87bc0d28ad
--- /dev/null
+++ b/superset/commands/sql_lab/query.py
@@ -0,0 +1,109 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import time
+from datetime import datetime, timedelta
+
+import sqlalchemy as sa
+
+from superset import db
+from superset.commands.base import BaseCommand
+from superset.models.sql_lab import Query
+
+logger = logging.getLogger(__name__)
+
+
+# pylint: disable=consider-using-transaction
+class QueryPruneCommand(BaseCommand):
+    """
+    Command to prune the query table by deleting rows older than the specified 
retention period.
+
+    This command deletes records from the `Query` table that have not been 
changed within the
+    specified number of days. It helps in maintaining the database by removing 
outdated entries
+    and freeing up space.
+
+    Attributes:
+        retention_period_days (int): The number of days for which records 
should be retained.
+                                     Records older than this period will be 
deleted.
+    """
+
+    def __init__(self, retention_period_days: int):
+        """
+        :param retention_period_days: Number of days to keep in the query table
+        """
+        self.retention_period_days = retention_period_days
+
+    def run(self) -> None:
+        """
+        Executes the prune command
+        """
+        batch_size = 999  # SQLite has a IN clause limit of 999
+        total_deleted = 0
+        start_time = time.time()
+
+        # Select all IDs that need to be deleted
+        ids_to_delete = (
+            db.session.execute(
+                sa.select(Query.id).where(
+                    Query.changed_on
+                    < datetime.now() - 
timedelta(days=self.retention_period_days)
+                )
+            )
+            .scalars()
+            .all()
+        )
+
+        total_rows = len(ids_to_delete)
+
+        logger.info("Total rows to be deleted: %s", total_rows)
+
+        next_logging_threshold = 1
+
+        # Iterate over the IDs in batches
+        for i in range(0, total_rows, batch_size):
+            batch_ids = ids_to_delete[i : i + batch_size]
+
+            # Delete the selected batch using IN clause
+            result = 
db.session.execute(sa.delete(Query).where(Query.id.in_(batch_ids)))
+
+            # Update the total number of deleted records
+            total_deleted += result.rowcount
+
+            # Explicitly commit the transaction given that if an error occurs, 
we want to ensure that the
+            # records that have been deleted so far are committed
+            db.session.commit()
+
+            # Log the number of deleted records every 1% increase in progress
+            percentage_complete = (total_deleted / total_rows) * 100
+            if percentage_complete >= next_logging_threshold:
+                logger.info(
+                    "Deleted %s rows from the query table older than %s days 
(%d%% complete)",
+                    total_deleted,
+                    self.retention_period_days,
+                    percentage_complete,
+                )
+                next_logging_threshold += 1
+
+        elapsed_time = time.time() - start_time
+        minutes, seconds = divmod(elapsed_time, 60)
+        formatted_time = f"{int(minutes):02}:{int(seconds):02}"
+        logger.info(
+            "Pruning complete: %s rows deleted in %s", total_deleted, 
formatted_time
+        )
+
+    def validate(self) -> None:
+        pass
diff --git a/superset/config.py b/superset/config.py
index 2fa8ef72ba..5b30397a0a 100644
--- a/superset/config.py
+++ b/superset/config.py
@@ -1000,6 +1000,12 @@ class CeleryConfig:  # pylint: 
disable=too-few-public-methods
             "task": "reports.prune_log",
             "schedule": crontab(minute=0, hour=0),
         },
+        # Uncomment to enable pruning of the query table
+        # "prune_query": {
+        #     "task": "prune_query",
+        #     "schedule": crontab(minute=0, hour=0, day_of_month=1),
+        #     "options": {"retention_period_days": 180},
+        # },
     }
 
 
diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py
index 652d556e89..ce963eff64 100644
--- a/superset/tasks/scheduler.py
+++ b/superset/tasks/scheduler.py
@@ -25,6 +25,7 @@ from superset.commands.exceptions import CommandException
 from superset.commands.report.exceptions import ReportScheduleUnexpectedError
 from superset.commands.report.execute import AsyncExecuteReportScheduleCommand
 from superset.commands.report.log_prune import 
AsyncPruneReportScheduleLogCommand
+from superset.commands.sql_lab.query import QueryPruneCommand
 from superset.daos.report import ReportScheduleDAO
 from superset.extensions import celery_app
 from superset.stats_logger import BaseStatsLogger
@@ -119,3 +120,16 @@ def prune_log() -> None:
         logger.warning("A timeout occurred while pruning report schedule logs: 
%s", ex)
     except CommandException:
         logger.exception("An exception occurred while pruning report schedule 
logs")
+
+
+@celery_app.task(name="prune_query")
+def prune_query() -> None:
+    stats_logger: BaseStatsLogger = app.config["STATS_LOGGER"]
+    stats_logger.incr("prune_query")
+
+    try:
+        QueryPruneCommand(
+            prune_query.request.properties.get("retention_period_days")
+        ).run()
+    except CommandException as ex:
+        logger.exception("An error occurred while pruning queries: %s", ex)

Reply via email to