This is an automated email from the ASF dual-hosted git repository.
michaelsmolina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git
The following commit(s) were added to refs/heads/master by this push:
new df06bdf33b fix: Signature of Celery pruner jobs (#32699)
df06bdf33b is described below
commit df06bdf33b460fbf9b37dbf2b20e14d1c490e414
Author: Michael S. Molina <[email protected]>
AuthorDate: Mon Mar 17 13:52:31 2025 -0300
fix: Signature of Celery pruner jobs (#32699)
---
superset/commands/logs/prune.py | 8 +++++---
superset/commands/sql_lab/query.py | 8 +++++---
superset/tasks/scheduler.py | 20 +++++++++++++-------
3 files changed, 23 insertions(+), 13 deletions(-)
diff --git a/superset/commands/logs/prune.py b/superset/commands/logs/prune.py
index 6a9cffb3f7..9ad031ea51 100644
--- a/superset/commands/logs/prune.py
+++ b/superset/commands/logs/prune.py
@@ -69,7 +69,7 @@ class LogPruneCommand(BaseCommand):
total_rows = len(ids_to_delete)
- logger.info("Total rows to be deleted: %s", total_rows)
+ logger.info("Total rows to be deleted: %s", f"{total_rows:,}")
next_logging_threshold = 1
@@ -92,7 +92,7 @@ class LogPruneCommand(BaseCommand):
if percentage_complete >= next_logging_threshold:
logger.info(
"Deleted %s rows from the logs table older than %s days
(%d%% complete)", # noqa: E501
- total_deleted,
+ f"{total_deleted:,}",
self.retention_period_days,
percentage_complete,
)
@@ -102,7 +102,9 @@ class LogPruneCommand(BaseCommand):
minutes, seconds = divmod(elapsed_time, 60)
formatted_time = f"{int(minutes):02}:{int(seconds):02}"
logger.info(
- "Pruning complete: %s rows deleted in %s", total_deleted,
formatted_time
+ "Pruning complete: %s rows deleted in %s",
+ f"{total_deleted:,}",
+ formatted_time,
)
def validate(self) -> None:
diff --git a/superset/commands/sql_lab/query.py
b/superset/commands/sql_lab/query.py
index 466d5f99c9..b5263fc2e2 100644
--- a/superset/commands/sql_lab/query.py
+++ b/superset/commands/sql_lab/query.py
@@ -69,7 +69,7 @@ class QueryPruneCommand(BaseCommand):
total_rows = len(ids_to_delete)
- logger.info("Total rows to be deleted: %s", total_rows)
+ logger.info("Total rows to be deleted: %s", f"{total_rows:,}")
next_logging_threshold = 1
@@ -92,7 +92,7 @@ class QueryPruneCommand(BaseCommand):
if percentage_complete >= next_logging_threshold:
logger.info(
"Deleted %s rows from the query table older than %s days
(%d%% complete)", # noqa: E501
- total_deleted,
+ f"{total_deleted:,}",
self.retention_period_days,
percentage_complete,
)
@@ -102,7 +102,9 @@ class QueryPruneCommand(BaseCommand):
minutes, seconds = divmod(elapsed_time, 60)
formatted_time = f"{int(minutes):02}:{int(seconds):02}"
logger.info(
- "Pruning complete: %s rows deleted in %s", total_deleted,
formatted_time
+ "Pruning complete: %s rows deleted in %s",
+ f"{total_deleted:,}",
+ formatted_time,
)
def validate(self) -> None:
diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py
index f894a6794b..3fec34d816 100644
--- a/superset/tasks/scheduler.py
+++ b/superset/tasks/scheduler.py
@@ -14,11 +14,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from __future__ import annotations
+
import logging
from datetime import datetime, timezone
-from typing import Optional
+from typing import Any
-from celery import Celery
+from celery import Task
from celery.exceptions import SoftTimeLimitExceeded
from superset import app, is_feature_enabled
@@ -77,7 +79,7 @@ def scheduler() -> None:
@celery_app.task(name="reports.execute", bind=True)
-def execute(self: Celery.task, report_schedule_id: int) -> None:
+def execute(self: Task, report_schedule_id: int) -> None:
stats_logger: BaseStatsLogger = app.config["STATS_LOGGER"]
stats_logger.incr("reports.execute")
@@ -124,8 +126,10 @@ def prune_log() -> None:
logger.exception("An exception occurred while pruning report schedule
logs")
-@celery_app.task(name="prune_query")
-def prune_query(retention_period_days: Optional[int] = None) -> None:
+@celery_app.task(name="prune_query", bind=True)
+def prune_query(
+ self: Task, retention_period_days: int | None = None, **kwargs: Any
+) -> None:
stats_logger: BaseStatsLogger = app.config["STATS_LOGGER"]
stats_logger.incr("prune_query")
@@ -145,8 +149,10 @@ def prune_query(retention_period_days: Optional[int] =
None) -> None:
logger.exception("An error occurred while pruning queries: %s", ex)
-@celery_app.task(name="prune_logs")
-def prune_logs(retention_period_days: Optional[int] = None) -> None:
+@celery_app.task(name="prune_logs", bind=True)
+def prune_logs(
+ self: Task, retention_period_days: int | None = None, **kwargs: Any
+) -> None:
stats_logger: BaseStatsLogger = app.config["STATS_LOGGER"]
stats_logger.incr("prune_logs")