This is an automated email from the ASF dual-hosted git repository.
dpgaspar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git
The following commit(s) were added to refs/heads/master by this push:
new 727847d fix: remove unnecessary app context on celery (#15422)
727847d is described below
commit 727847d2e57d64ef30ddd0d7215187e334d006c2
Author: Daniel Vaz Gaspar <[email protected]>
AuthorDate: Tue Jun 29 12:16:16 2021 +0100
fix: remove unnecessary app context on celery (#15422)
* fix: remove unnecessary app context on celery
* fix lint
* fix lint
---
superset/tasks/async_queries.py | 149 +++++++++++++++++++---------------------
superset/tasks/thumbnails.py | 60 ++++++++--------
2 files changed, 98 insertions(+), 111 deletions(-)
diff --git a/superset/tasks/async_queries.py b/superset/tasks/async_queries.py
index 5fbe39e..8e7d2ea 100644
--- a/superset/tasks/async_queries.py
+++ b/superset/tasks/async_queries.py
@@ -22,7 +22,6 @@ from typing import Any, cast, Dict, Optional
from celery.exceptions import SoftTimeLimitExceeded
from flask import current_app, g
-from superset import app
from superset.exceptions import SupersetVizException
from superset.extensions import (
async_query_manager,
@@ -53,32 +52,27 @@ def load_chart_data_into_cache(
) -> None:
from superset.charts.commands.data import ChartDataCommand
- with app.app_context(): # type: ignore
- try:
- ensure_user_is_set(job_metadata.get("user_id"))
- command = ChartDataCommand()
- command.set_query_context(form_data)
- result = command.run(cache=True)
- cache_key = result["cache_key"]
- result_url = f"/api/v1/chart/data/{cache_key}"
- async_query_manager.update_job(
- job_metadata, async_query_manager.STATUS_DONE,
result_url=result_url,
- )
- except SoftTimeLimitExceeded as exc:
- logger.warning(
- "A timeout occurred while loading chart data, error: %s", exc
- )
- raise exc
- except Exception as exc:
- # TODO: QueryContext should support SIP-40 style errors
- error = exc.message if hasattr(exc, "message") else str(exc) #
type: ignore # pylint: disable=no-member
- errors = [{"message": error}]
- async_query_manager.update_job(
- job_metadata, async_query_manager.STATUS_ERROR, errors=errors
- )
- raise exc
-
- return None
+ try:
+ ensure_user_is_set(job_metadata.get("user_id"))
+ command = ChartDataCommand()
+ command.set_query_context(form_data)
+ result = command.run(cache=True)
+ cache_key = result["cache_key"]
+ result_url = f"/api/v1/chart/data/{cache_key}"
+ async_query_manager.update_job(
+ job_metadata, async_query_manager.STATUS_DONE,
result_url=result_url,
+ )
+ except SoftTimeLimitExceeded as exc:
+ logger.warning("A timeout occurred while loading chart data, error:
%s", exc)
+ raise exc
+ except Exception as exc:
+ # TODO: QueryContext should support SIP-40 style errors
+ error = exc.message if hasattr(exc, "message") else str(exc) # type:
ignore # pylint: disable=no-member
+ errors = [{"message": error}]
+ async_query_manager.update_job(
+ job_metadata, async_query_manager.STATUS_ERROR, errors=errors
+ )
+ raise exc
@celery_app.task(name="load_explore_json_into_cache",
soft_time_limit=query_timeout)
@@ -88,58 +82,53 @@ def load_explore_json_into_cache( # pylint:
disable=too-many-locals
response_type: Optional[str] = None,
force: bool = False,
) -> None:
- with app.app_context(): # type: ignore
- cache_key_prefix = "ejr-" # ejr: explore_json request
- try:
- ensure_user_is_set(job_metadata.get("user_id"))
- datasource_id, datasource_type = get_datasource_info(None, None,
form_data)
-
- # Perform a deep copy here so that below we can cache the original
- # value of the form_data object. This is necessary since the viz
- # objects modify the form_data object. If the modified version were
- # to be cached here, it will lead to a cache miss when clients
- # attempt to retrieve the value of the completed async query.
- original_form_data = copy.deepcopy(form_data)
-
- viz_obj = get_viz(
- datasource_type=cast(str, datasource_type),
- datasource_id=datasource_id,
- form_data=form_data,
- force=force,
- )
- # run query & cache results
- payload = viz_obj.get_payload()
- if viz_obj.has_error(payload):
- raise SupersetVizException(errors=payload["errors"])
-
- # Cache the original form_data value for async retrieval
- cache_value = {
- "form_data": original_form_data,
- "response_type": response_type,
- }
- cache_key = generate_cache_key(cache_value, cache_key_prefix)
- set_and_log_cache(cache_manager.cache, cache_key, cache_value)
- result_url = f"/superset/explore_json/data/{cache_key}"
- async_query_manager.update_job(
- job_metadata, async_query_manager.STATUS_DONE,
result_url=result_url,
- )
- except SoftTimeLimitExceeded as ex:
- logger.warning(
- "A timeout occurred while loading explore json, error: %s", ex
- )
- raise ex
- except Exception as exc:
- if isinstance(exc, SupersetVizException):
- errors = exc.errors # pylint: disable=no-member
- else:
- error = (
- exc.message if hasattr(exc, "message") else str(exc) #
type: ignore # pylint: disable=no-member
- )
- errors = [error]
-
- async_query_manager.update_job(
- job_metadata, async_query_manager.STATUS_ERROR, errors=errors
+ cache_key_prefix = "ejr-" # ejr: explore_json request
+ try:
+ ensure_user_is_set(job_metadata.get("user_id"))
+ datasource_id, datasource_type = get_datasource_info(None, None,
form_data)
+
+ # Perform a deep copy here so that below we can cache the original
+ # value of the form_data object. This is necessary since the viz
+ # objects modify the form_data object. If the modified version were
+ # to be cached here, it will lead to a cache miss when clients
+ # attempt to retrieve the value of the completed async query.
+ original_form_data = copy.deepcopy(form_data)
+
+ viz_obj = get_viz(
+ datasource_type=cast(str, datasource_type),
+ datasource_id=datasource_id,
+ form_data=form_data,
+ force=force,
+ )
+ # run query & cache results
+ payload = viz_obj.get_payload()
+ if viz_obj.has_error(payload):
+ raise SupersetVizException(errors=payload["errors"])
+
+ # Cache the original form_data value for async retrieval
+ cache_value = {
+ "form_data": original_form_data,
+ "response_type": response_type,
+ }
+ cache_key = generate_cache_key(cache_value, cache_key_prefix)
+ set_and_log_cache(cache_manager.cache, cache_key, cache_value)
+ result_url = f"/superset/explore_json/data/{cache_key}"
+ async_query_manager.update_job(
+ job_metadata, async_query_manager.STATUS_DONE,
result_url=result_url,
+ )
+ except SoftTimeLimitExceeded as ex:
+ logger.warning("A timeout occurred while loading explore json, error:
%s", ex)
+ raise ex
+ except Exception as exc:
+ if isinstance(exc, SupersetVizException):
+ errors = exc.errors # pylint: disable=no-member
+ else:
+ error = (
+ exc.message if hasattr(exc, "message") else str(exc) # type:
ignore # pylint: disable=no-member
)
- raise exc
+ errors = [error]
- return None
+ async_query_manager.update_job(
+ job_metadata, async_query_manager.STATUS_ERROR, errors=errors
+ )
+ raise exc
diff --git a/superset/tasks/thumbnails.py b/superset/tasks/thumbnails.py
index 4ca4f27..5e4b8df 100644
--- a/superset/tasks/thumbnails.py
+++ b/superset/tasks/thumbnails.py
@@ -22,7 +22,7 @@ from typing import Optional
from flask import current_app
-from superset import app, security_manager, thumbnail_cache
+from superset import security_manager, thumbnail_cache
from superset.extensions import celery_app
from superset.utils.celery import session_scope
from superset.utils.screenshots import ChartScreenshot, DashboardScreenshot
@@ -39,40 +39,38 @@ def cache_chart_thumbnail(
window_size: Optional[WindowSize] = None,
thumb_size: Optional[WindowSize] = None,
) -> None:
- with app.app_context(): # type: ignore
- if not thumbnail_cache:
- logger.warning("No cache set, refusing to compute")
- return None
- logger.info("Caching chart: %s", url)
- screenshot = ChartScreenshot(url, digest)
- with session_scope(nullpool=True) as session:
- user = security_manager.get_user_by_username(
- current_app.config["THUMBNAIL_SELENIUM_USER"], session=session
- )
- screenshot.compute_and_cache(
- user=user,
- cache=thumbnail_cache,
- force=force,
- window_size=window_size,
- thumb_size=thumb_size,
- )
+ if not thumbnail_cache:
+ logger.warning("No cache set, refusing to compute")
return None
+ logger.info("Caching chart: %s", url)
+ screenshot = ChartScreenshot(url, digest)
+ with session_scope(nullpool=True) as session:
+ user = security_manager.get_user_by_username(
+ current_app.config["THUMBNAIL_SELENIUM_USER"], session=session
+ )
+ screenshot.compute_and_cache(
+ user=user,
+ cache=thumbnail_cache,
+ force=force,
+ window_size=window_size,
+ thumb_size=thumb_size,
+ )
+ return None
@celery_app.task(name="cache_dashboard_thumbnail", soft_time_limit=300)
def cache_dashboard_thumbnail(
url: str, digest: str, force: bool = False, thumb_size:
Optional[WindowSize] = None
) -> None:
- with app.app_context(): # type: ignore
- if not thumbnail_cache:
- logging.warning("No cache set, refusing to compute")
- return
- logger.info("Caching dashboard: %s", url)
- screenshot = DashboardScreenshot(url, digest)
- with session_scope(nullpool=True) as session:
- user = security_manager.get_user_by_username(
- current_app.config["THUMBNAIL_SELENIUM_USER"], session=session
- )
- screenshot.compute_and_cache(
- user=user, cache=thumbnail_cache, force=force,
thumb_size=thumb_size,
- )
+ if not thumbnail_cache:
+ logging.warning("No cache set, refusing to compute")
+ return
+ logger.info("Caching dashboard: %s", url)
+ screenshot = DashboardScreenshot(url, digest)
+ with session_scope(nullpool=True) as session:
+ user = security_manager.get_user_by_username(
+ current_app.config["THUMBNAIL_SELENIUM_USER"], session=session
+ )
+ screenshot.compute_and_cache(
+ user=user, cache=thumbnail_cache, force=force,
thumb_size=thumb_size,
+ )