This is an automated email from the ASF dual-hosted git repository.

amitmiran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d0c759  refactor: sql lab command: separate concerns into different 
modules (#16917)
0d0c759 is described below

commit 0d0c759cfed3dc72aaca029b5da1db1acfa393a9
Author: ofekisr <[email protected]>
AuthorDate: Sun Oct 3 11:15:46 2021 +0300

    refactor: sql lab command: separate concerns into different modules (#16917)
    
    * chore move sql_execution_context to sqllab package
    
    * add new helper methods into base Dao
    
    * refactor separate get existing query concern from command
    
    * refactor separate query access validation concern
    
    * refactor separate get query's database concern from command
    
    * refactor separate get query rendering concern from command
    
    * refactor sqllab_execution_context
    
    * refactor separate creating payload for view
    
    * chore decouple command from superset app
    
    * fix pylint issues
    
    * fix failed tests
    
    * fix pylint issues
    
    * fix failed test
    
    * fix failed black
    
    * fix failed black
    
    * fix failed test
---
 superset/charts/dao.py                             |   1 +
 superset/dao/base.py                               |  35 ++
 superset/datasets/dao.py                           |   2 +-
 superset/models/core.py                            |   3 +-
 superset/sqllab/command.py                         | 352 +++++----------------
 superset/sqllab/exceptions.py                      |  17 +
 superset/sqllab/execution_context_convertor.py     |  67 ++++
 superset/sqllab/query_render.py                    | 153 +++++++++
 superset/sqllab/sql_json_executer.py               | 207 ++++++++++++
 .../{utils => sqllab}/sqllab_execution_context.py  |   0
 superset/sqllab/validators.py                      |  31 ++
 superset/views/core.py                             |  68 +++-
 tests/integration_tests/celery_tests.py            |   4 +-
 tests/integration_tests/core_tests.py              |   3 +-
 tests/integration_tests/sqllab_tests.py            |   2 +-
 15 files changed, 660 insertions(+), 285 deletions(-)

diff --git a/superset/charts/dao.py b/superset/charts/dao.py
index 2a80f82..8e16f3b 100644
--- a/superset/charts/dao.py
+++ b/superset/charts/dao.py
@@ -14,6 +14,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+# pylint: disable=arguments-renamed
 import logging
 from typing import List, Optional, TYPE_CHECKING
 
diff --git a/superset/dao/base.py b/superset/dao/base.py
index 79ece40..ebd6a89 100644
--- a/superset/dao/base.py
+++ b/superset/dao/base.py
@@ -14,6 +14,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+# pylint: disable=isinstance-second-argument-not-valid-type
 from typing import Any, Dict, List, Optional, Type
 
 from flask_appbuilder.models.filters import BaseFilter
@@ -90,6 +91,19 @@ class BaseDAO:
         return query.all()
 
     @classmethod
+    def find_one_or_none(cls, **filter_by: Any) -> Optional[Model]:
+        """
+        Get the first that fit the `base_filter`
+        """
+        query = db.session.query(cls.model_cls)
+        if cls.base_filter:
+            data_model = SQLAInterface(cls.model_cls, db.session)
+            query = cls.base_filter(  # pylint: disable=not-callable
+                "id", data_model
+            ).apply(query, None)
+        return query.filter_by(**filter_by).one_or_none()
+
+    @classmethod
     def create(cls, properties: Dict[str, Any], commit: bool = True) -> Model:
         """
         Generic for creating models
@@ -110,6 +124,27 @@ class BaseDAO:
         return model
 
     @classmethod
+    def save(cls, instance_model: Model, commit: bool = True) -> Model:
+        """
+        Generic for saving models
+        :raises: DAOCreateFailedError
+        """
+        if cls.model_cls is None:
+            raise DAOConfigError()
+        if not isinstance(instance_model, cls.model_cls):
+            raise DAOCreateFailedError(
+                "the instance model is not a type of the model class"
+            )
+        try:
+            db.session.add(instance_model)
+            if commit:
+                db.session.commit()
+        except SQLAlchemyError as ex:  # pragma: no cover
+            db.session.rollback()
+            raise DAOCreateFailedError(exception=ex) from ex
+        return instance_model
+
+    @classmethod
     def update(
         cls, model: Model, properties: Dict[str, Any], commit: bool = True
     ) -> Model:
diff --git a/superset/datasets/dao.py b/superset/datasets/dao.py
index b58622b..363e89b 100644
--- a/superset/datasets/dao.py
+++ b/superset/datasets/dao.py
@@ -143,7 +143,7 @@ class DatasetDAO(BaseDAO):  # pylint: 
disable=too-many-public-methods
         return len(dataset_query) == 0
 
     @classmethod
-    def update(  # pylint: disable=arguments-differ
+    def update(
         cls, model: SqlaTable, properties: Dict[str, Any], commit: bool = True
     ) -> Optional[SqlaTable]:
         """
diff --git a/superset/models/core.py b/superset/models/core.py
index 28ba2eb..d4e7850 100755
--- a/superset/models/core.py
+++ b/superset/models/core.py
@@ -14,6 +14,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+# pylint: disable=line-too-long
 """A collection of ORM sqlalchemy models for Superset"""
 import enum
 import json
@@ -253,7 +254,7 @@ class Database(
     @property
     def parameters_schema(self) -> Dict[str, Any]:
         try:
-            parameters_schema = self.db_engine_spec.parameters_json_schema()  
# type: ignore # pylint: disable=line-too-long
+            parameters_schema = self.db_engine_spec.parameters_json_schema()  
# type: ignore
         except Exception:  # pylint: disable=broad-except
             parameters_schema = {}
         return parameters_schema
diff --git a/superset/sqllab/command.py b/superset/sqllab/command.py
index c9b9df4..cb48549 100644
--- a/superset/sqllab/command.py
+++ b/superset/sqllab/command.py
@@ -14,73 +14,71 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-# pylint: disable=line-too-long
+# pylint: disable=too-few-public-methods, too-many-arguments
 from __future__ import annotations
 
-import dataclasses
 import logging
-from typing import Any, Dict, Optional
+from typing import Any, Dict, Optional, TYPE_CHECKING
 
-import simplejson as json
-from flask import g
-from flask_babel import gettext as __, ngettext
-from jinja2.exceptions import TemplateError
-from jinja2.meta import find_undeclared_variables
-from sqlalchemy.exc import SQLAlchemyError
-from sqlalchemy.orm.session import Session
+from flask_babel import gettext as __
 
-from superset import app, db, is_feature_enabled, sql_lab
 from superset.commands.base import BaseCommand
 from superset.common.db_query_status import QueryStatus
-from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
-from superset.exceptions import (
-    SupersetErrorException,
-    SupersetErrorsException,
-    SupersetGenericDBErrorException,
-    SupersetGenericErrorException,
-    SupersetSecurityException,
-    SupersetTemplateParamsErrorException,
-    SupersetTimeoutException,
-)
-from superset.jinja_context import BaseTemplateProcessor, 
get_template_processor
+from superset.dao.exceptions import DAOCreateFailedError
+from superset.errors import SupersetErrorType
+from superset.exceptions import SupersetErrorsException, 
SupersetGenericErrorException
 from superset.models.core import Database
 from superset.models.sql_lab import Query
-from superset.queries.dao import QueryDAO
 from superset.sqllab.command_status import SqlJsonExecutionStatus
-from superset.sqllab.exceptions import SqlLabException
+from superset.sqllab.exceptions import (
+    QueryIsForbiddenToAccessException,
+    SqlLabException,
+)
 from superset.sqllab.limiting_factor import LimitingFactor
-from superset.sqllab.utils import 
apply_display_max_row_configuration_if_require
-from superset.utils import core as utils
-from superset.utils.dates import now_as_float
-from superset.utils.sqllab_execution_context import SqlJsonExecutionContext
-
-config = app.config
-logger = logging.getLogger(__name__)
 
-PARAMETER_MISSING_ERR = (
-    "Please check your template parameters for syntax errors and make sure "
-    "they match across your SQL query and Set Parameters. Then, try running "
-    "your query again."
-)
+if TYPE_CHECKING:
+    from superset.sqllab.sql_json_executer import SqlJsonExecutor
+    from superset.sqllab.sqllab_execution_context import 
SqlJsonExecutionContext
+    from superset.queries.dao import QueryDAO
+    from superset.databases.dao import DatabaseDAO
 
-SqlResults = Dict[str, Any]
+logger = logging.getLogger(__name__)
 
 CommandResult = Dict[str, Any]
 
 
 class ExecuteSqlCommand(BaseCommand):
     _execution_context: SqlJsonExecutionContext
+    _query_dao: QueryDAO
+    _database_dao: DatabaseDAO
+    _access_validator: CanAccessQueryValidator
+    _sql_query_render: SqlQueryRender
+    _sql_json_executor: SqlJsonExecutor
+    _execution_context_convertor: ExecutionContextConvertor
+    _sqllab_ctas_no_limit: bool
     _log_params: Optional[Dict[str, Any]] = None
-    _session: Session
 
     def __init__(
         self,
         execution_context: SqlJsonExecutionContext,
+        query_dao: QueryDAO,
+        database_dao: DatabaseDAO,
+        access_validator: CanAccessQueryValidator,
+        sql_query_render: SqlQueryRender,
+        sql_json_executor: SqlJsonExecutor,
+        execution_context_convertor: ExecutionContextConvertor,
+        sqllab_ctas_no_limit_flag: bool,
         log_params: Optional[Dict[str, Any]] = None,
     ) -> None:
         self._execution_context = execution_context
+        self._query_dao = query_dao
+        self._database_dao = database_dao
+        self._access_validator = access_validator
+        self._sql_query_render = sql_query_render
+        self._sql_json_executor = sql_json_executor
+        self._execution_context_convertor = execution_context_convertor
+        self._sqllab_ctas_no_limit = sqllab_ctas_no_limit_flag
         self._log_params = log_params
-        self._session = db.session()
 
     def validate(self) -> None:
         pass
@@ -90,7 +88,7 @@ class ExecuteSqlCommand(BaseCommand):
     ) -> CommandResult:
         """Runs arbitrary sql and returns data as json"""
         try:
-            query = self._get_existing_query()
+            query = self._try_get_existing_query()
             if self.is_query_handled(query):
                 self._execution_context.set_query(query)  # type: ignore
                 status = SqlJsonExecutionStatus.QUERY_ALREADY_CREATED
@@ -98,24 +96,21 @@ class ExecuteSqlCommand(BaseCommand):
                 status = self._run_sql_json_exec_from_scratch()
             return {
                 "status": status,
-                "payload": self._create_payload_from_execution_context(status),
+                "payload": self._execution_context_convertor.to_payload(
+                    self._execution_context, status
+                ),
             }
         except (SqlLabException, SupersetErrorsException) as ex:
             raise ex
         except Exception as ex:
             raise SqlLabException(self._execution_context, exception=ex) from 
ex
 
-    def _get_existing_query(self) -> Optional[Query]:
-        query = (
-            self._session.query(Query)
-            .filter_by(
-                client_id=self._execution_context.client_id,
-                user_id=self._execution_context.user_id,
-                sql_editor_id=self._execution_context.sql_editor_id,
-            )
-            .one_or_none()
+    def _try_get_existing_query(self) -> Optional[Query]:
+        return self._query_dao.find_one_or_none(
+            client_id=self._execution_context.client_id,
+            user_id=self._execution_context.user_id,
+            sql_editor_id=self._execution_context.sql_editor_id,
         )
-        return query
 
     @classmethod
     def is_query_handled(cls, query: Optional[Query]) -> bool:
@@ -130,20 +125,20 @@ class ExecuteSqlCommand(BaseCommand):
         query = self._execution_context.create_query()
         self._save_new_query(query)
         try:
-            self._save_new_query(query)
             logger.info("Triggering query_id: %i", query.id)
             self._validate_access(query)
             self._execution_context.set_query(query)
-            rendered_query = self._render_query()
+            rendered_query = 
self._sql_query_render.render(self._execution_context)
             self._set_query_limit_if_required(rendered_query)
-            return self._execute_query(rendered_query)
+            return self._sql_json_executor.execute(
+                self._execution_context, rendered_query, self._log_params
+            )
         except Exception as ex:
-            query.status = QueryStatus.FAILED
-            self._session.commit()
+            self._query_dao.update(query, {"status": QueryStatus.FAILED})
             raise ex
 
     def _get_the_query_db(self) -> Database:
-        mydb = 
self._session.query(Database).get(self._execution_context.database_id)
+        mydb = 
self._database_dao.find_by_id(self._execution_context.database_id)
         self._validate_query_db(mydb)
         return mydb
 
@@ -159,74 +154,21 @@ class ExecuteSqlCommand(BaseCommand):
 
     def _save_new_query(self, query: Query) -> None:
         try:
-            self._session.add(query)
-            self._session.flush()
-            self._session.commit()  # shouldn't be necessary
-        except SQLAlchemyError as ex:
-            logger.error("Errors saving query details %s", str(ex), 
exc_info=True)
-            self._session.rollback()
-        if not query.id:
-            raise SupersetGenericErrorException(
-                __(
-                    "The query record was not created as expected. Please "
-                    "contact an administrator for further assistance or try 
again."
-                )
-            )
+            self._query_dao.save(query)
+        except DAOCreateFailedError as ex:
+            raise SqlLabException(
+                self._execution_context,
+                SupersetErrorType.GENERIC_DB_ENGINE_ERROR,
+                "The query record was not created as expected",
+                ex,
+                "Please contact an administrator for further assistance or try 
again.",
+            ) from ex
 
     def _validate_access(self, query: Query) -> None:
         try:
-            query.raise_for_access()
-        except SupersetSecurityException as ex:
-            query.set_extra_json_key("errors", [dataclasses.asdict(ex.error)])
-            query.status = QueryStatus.FAILED
-            query.error_message = ex.error.message
-            self._session.commit()
-            raise SupersetErrorException(ex.error, status=403) from ex
-
-    def _render_query(self) -> str:
-        def validate(
-            rendered_query: str, template_processor: BaseTemplateProcessor
-        ) -> None:
-            if is_feature_enabled("ENABLE_TEMPLATE_PROCESSING"):
-                # pylint: disable=protected-access
-                ast = template_processor._env.parse(rendered_query)
-                undefined_parameters = find_undeclared_variables(ast)  # type: 
ignore
-                if undefined_parameters:
-                    raise SupersetTemplateParamsErrorException(
-                        message=ngettext(
-                            "The parameter %(parameters)s in your query is 
undefined.",
-                            "The following parameters in your query are 
undefined: %(parameters)s.",
-                            len(undefined_parameters),
-                            parameters=utils.format_list(undefined_parameters),
-                        )
-                        + " "
-                        + PARAMETER_MISSING_ERR,
-                        error=SupersetErrorType.MISSING_TEMPLATE_PARAMS_ERROR,
-                        extra={
-                            "undefined_parameters": list(undefined_parameters),
-                            "template_parameters": 
self._execution_context.template_params,
-                        },
-                    )
-
-        query = self._execution_context.query
-
-        try:
-            template_processor = get_template_processor(
-                database=query.database, query=query
-            )
-            rendered_query = template_processor.process_template(
-                query.sql, **self._execution_context.template_params
-            )
-            validate(rendered_query, template_processor)
-        except TemplateError as ex:
-            raise SupersetTemplateParamsErrorException(
-                message=__(
-                    'The query contains one or more malformed template 
parameters. Please check your query and confirm that all template parameters 
are surround by double braces, for example, "{{ ds }}". Then, try running your 
query again.'
-                ),
-                error=SupersetErrorType.INVALID_TEMPLATE_PARAMS_ERROR,
-            ) from ex
-
-        return rendered_query
+            self._access_validator.validate(query)
+        except Exception as ex:
+            raise QueryIsForbiddenToAccessException(self._execution_context, 
ex) from ex
 
     def _set_query_limit_if_required(self, rendered_query: str,) -> None:
         if self._is_required_to_set_limit():
@@ -234,7 +176,7 @@ class ExecuteSqlCommand(BaseCommand):
 
     def _is_required_to_set_limit(self) -> bool:
         return not (
-            config.get("SQLLAB_CTAS_NO_LIMIT") and 
self._execution_context.select_as_cta
+            self._sqllab_ctas_no_limit and 
self._execution_context.select_as_cta
         )
 
     def _set_query_limit(self, rendered_query: str) -> None:
@@ -255,161 +197,21 @@ class ExecuteSqlCommand(BaseCommand):
             lim for lim in limits if lim is not None
         )
 
-    def _execute_query(self, rendered_query: str,) -> SqlJsonExecutionStatus:
-        # Flag for whether or not to expand data
-        # (feature that will expand Presto row objects and arrays)
-        # Async request.
-        if self._execution_context.is_run_asynchronous():
-            return self._sql_json_async(rendered_query)
-
-        return self._sql_json_sync(rendered_query)
-
-    def _sql_json_async(self, rendered_query: str,) -> SqlJsonExecutionStatus:
-        """
-        Send SQL JSON query to celery workers.
-        :param rendered_query: the rendered query to perform by workers
-        :return: A Flask Response
-        """
-        query = self._execution_context.query
-        logger.info("Query %i: Running query on a Celery worker", query.id)
-        # Ignore the celery future object and the request may time out.
-        query_id = query.id
-        try:
-            task = sql_lab.get_sql_results.delay(
-                query.id,
-                rendered_query,
-                return_results=False,
-                store_results=not query.select_as_cta,
-                user_name=g.user.username
-                if g.user and hasattr(g.user, "username")
-                else None,
-                start_time=now_as_float(),
-                expand_data=self._execution_context.expand_data,
-                log_params=self._log_params,
-            )
-
-            # Explicitly forget the task to ensure the task metadata is 
removed from the
-            # Celery results backend in a timely manner.
-            try:
-                task.forget()
-            except NotImplementedError:
-                logger.warning(
-                    "Unable to forget Celery task as backend"
-                    "does not support this operation"
-                )
-        except Exception as ex:
-            logger.exception("Query %i: %s", query.id, str(ex))
-
-            message = __("Failed to start remote query on a worker.")
-            error = SupersetError(
-                message=message,
-                error_type=SupersetErrorType.ASYNC_WORKERS_ERROR,
-                level=ErrorLevel.ERROR,
-            )
-            error_payload = dataclasses.asdict(error)
-
-            query.set_extra_json_key("errors", [error_payload])
-            query.status = QueryStatus.FAILED
-            query.error_message = message
-            self._session.commit()
 
-            raise SupersetErrorException(error) from ex
+class CanAccessQueryValidator:
+    def validate(self, query: Query) -> None:
+        raise NotImplementedError()
 
-        # Update saved query with execution info from the query execution
-        QueryDAO.update_saved_query_exec_info(query_id)
 
-        self._session.commit()
-        return SqlJsonExecutionStatus.QUERY_IS_RUNNING
+class SqlQueryRender:
+    def render(self, execution_context: SqlJsonExecutionContext) -> str:
+        raise NotImplementedError()
 
-    def _sql_json_sync(self, rendered_query: str) -> SqlJsonExecutionStatus:
-        """
-        Execute SQL query (sql json).
 
-        :param rendered_query: The rendered query (included templates)
-        :raises: SupersetTimeoutException
-        """
-        query = self._execution_context.query
-        try:
-            timeout = config["SQLLAB_TIMEOUT"]
-            timeout_msg = f"The query exceeded the {timeout} seconds timeout."
-            query_id = query.id
-            data = self._get_sql_results_with_timeout(
-                timeout, rendered_query, timeout_msg,
-            )
-            # Update saved query if needed
-            QueryDAO.update_saved_query_exec_info(query_id)
-            self._execution_context.set_execution_result(data)
-        except SupersetTimeoutException as ex:
-            # re-raise exception for api exception handler
-            raise ex
-        except Exception as ex:
-            logger.exception("Query %i failed unexpectedly", query.id)
-            raise SupersetGenericDBErrorException(
-                utils.error_msg_from_exception(ex)
-            ) from ex
-
-        if data is not None and data.get("status") == QueryStatus.FAILED:
-            # new error payload with rich context
-            if data["errors"]:
-                raise SupersetErrorsException(
-                    [SupersetError(**params) for params in data["errors"]]
-                )
-            # old string-only error message
-            raise SupersetGenericDBErrorException(data["error"])
-        return SqlJsonExecutionStatus.HAS_RESULTS
-
-    def _get_sql_results_with_timeout(
-        self, timeout: int, rendered_query: str, timeout_msg: str,
-    ) -> Optional[SqlResults]:
-        query = self._execution_context.query
-        with utils.timeout(seconds=timeout, error_message=timeout_msg):
-            # pylint: disable=no-value-for-parameter
-            return sql_lab.get_sql_results(
-                query.id,
-                rendered_query,
-                return_results=True,
-                store_results=self._is_store_results(query),
-                user_name=g.user.username
-                if g.user and hasattr(g.user, "username")
-                else None,
-                expand_data=self._execution_context.expand_data,
-                log_params=self._log_params,
-            )
-
-    @classmethod
-    def _is_store_results(cls, query: Query) -> bool:
-        return (
-            is_feature_enabled("SQLLAB_BACKEND_PERSISTENCE") and not 
query.select_as_cta
-        )
-
-    def _create_payload_from_execution_context(  # pylint: disable=invalid-name
-        self, status: SqlJsonExecutionStatus,
-    ) -> str:
-
-        if status == SqlJsonExecutionStatus.HAS_RESULTS:
-            return self._to_payload_results_based(
-                self._execution_context.get_execution_result() or {}
-            )
-        return self._to_payload_query_based(self._execution_context.query)
-
-    def _to_payload_results_based(  # pylint: disable=no-self-use
-        self, execution_result: SqlResults
-    ) -> str:
-        display_max_row = config["DISPLAY_MAX_ROW"]
-        return json.dumps(
-            apply_display_max_row_configuration_if_require(
-                execution_result, display_max_row
-            ),
-            default=utils.pessimistic_json_iso_dttm_ser,
-            ignore_nan=True,
-            encoding=None,
-        )
-
-    def _to_payload_query_based(  # pylint: disable=no-self-use
-        self, query: Query
+class ExecutionContextConvertor:
+    def to_payload(
+        self,
+        execution_context: SqlJsonExecutionContext,
+        execution_status: SqlJsonExecutionStatus,
     ) -> str:
-        return json.dumps(
-            {"query": query.to_dict()},
-            default=utils.json_int_dttm_ser,
-            ignore_nan=True,
-        )
+        raise NotImplementedError()
diff --git a/superset/sqllab/exceptions.py b/superset/sqllab/exceptions.py
index 6b1736c..ac632d7 100644
--- a/superset/sqllab/exceptions.py
+++ b/superset/sqllab/exceptions.py
@@ -81,3 +81,20 @@ class SqlLabException(SupersetException):
                 return ": {}".format(exception.message)  # type: ignore
             return ": {}".format(str(exception))
         return ""
+
+
+QUERY_IS_FORBIDDEN_TO_ACCESS_REASON_MESSAGE = "can not access the query"
+
+
+class QueryIsForbiddenToAccessException(SqlLabException):
+    def __init__(
+        self,
+        sql_json_execution_context: SqlJsonExecutionContext,
+        exception: Optional[Exception] = None,
+    ) -> None:
+        super().__init__(
+            sql_json_execution_context,
+            SupersetErrorType.QUERY_SECURITY_ACCESS_ERROR,
+            QUERY_IS_FORBIDDEN_TO_ACCESS_REASON_MESSAGE,
+            exception,
+        )
diff --git a/superset/sqllab/execution_context_convertor.py 
b/superset/sqllab/execution_context_convertor.py
new file mode 100644
index 0000000..6d52355
--- /dev/null
+++ b/superset/sqllab/execution_context_convertor.py
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+import simplejson as json
+
+import superset.utils.core as utils
+from superset.sqllab.command import ExecutionContextConvertor
+from superset.sqllab.command_status import SqlJsonExecutionStatus
+from superset.sqllab.utils import 
apply_display_max_row_configuration_if_require
+
+if TYPE_CHECKING:
+    from superset.sqllab.sqllab_execution_context import 
SqlJsonExecutionContext
+    from superset.sqllab.sql_json_executer import SqlResults
+    from superset.models.sql_lab import Query
+
+
+class ExecutionContextConvertorImpl(ExecutionContextConvertor):
+    _max_row_in_display_configuration: int  # pylint: disable=invalid-name
+
+    def set_max_row_in_display(self, value: int) -> None:
+        self._max_row_in_display_configuration = value  # pylint: 
disable=invalid-name
+
+    def to_payload(
+        self,
+        execution_context: SqlJsonExecutionContext,
+        execution_status: SqlJsonExecutionStatus,
+    ) -> str:
+
+        if execution_status == SqlJsonExecutionStatus.HAS_RESULTS:
+            return self._to_payload_results_based(
+                execution_context.get_execution_result() or {}
+            )
+        return self._to_payload_query_based(execution_context.query)
+
+    def _to_payload_results_based(self, execution_result: SqlResults) -> str:
+        return json.dumps(
+            apply_display_max_row_configuration_if_require(
+                execution_result, self._max_row_in_display_configuration
+            ),
+            default=utils.pessimistic_json_iso_dttm_ser,
+            ignore_nan=True,
+            encoding=None,
+        )
+
+    def _to_payload_query_based(  # pylint: disable=no-self-use
+        self, query: Query
+    ) -> str:
+        return json.dumps(
+            {"query": query.to_dict()}, default=utils.json_int_dttm_ser, 
ignore_nan=True
+        )
diff --git a/superset/sqllab/query_render.py b/superset/sqllab/query_render.py
new file mode 100644
index 0000000..b03b21d
--- /dev/null
+++ b/superset/sqllab/query_render.py
@@ -0,0 +1,153 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint: disable=invalid-name, no-self-use, too-few-public-methods, 
too-many-arguments
+from __future__ import annotations
+
+from typing import Any, Callable, Dict, Optional, TYPE_CHECKING
+
+from flask_babel import gettext as __, ngettext
+from jinja2 import TemplateError
+from jinja2.meta import find_undeclared_variables
+
+from superset import is_feature_enabled
+from superset.errors import SupersetErrorType
+from superset.sqllab.command import SqlQueryRender
+from superset.sqllab.exceptions import SqlLabException
+from superset.utils import core as utils
+
+MSG_OF_1006 = "Issue 1006 - One or more parameters specified in the query are 
missing."
+
+if TYPE_CHECKING:
+    from superset.sqllab.sqllab_execution_context import 
SqlJsonExecutionContext
+    from superset.jinja_context import BaseTemplateProcessor
+
+PARAMETER_MISSING_ERR = (
+    "Please check your template parameters for syntax errors and make sure "
+    "they match across your SQL query and Set Parameters. Then, try running "
+    "your query again."
+)
+
+
+class SqlQueryRenderImpl(SqlQueryRender):
+    _sql_template_processor_factory: Callable[..., BaseTemplateProcessor]
+
+    def __init__(
+        self, sql_template_factory: Callable[..., BaseTemplateProcessor]
+    ) -> None:
+
+        self._sql_template_processor_factory = sql_template_factory  # type: 
ignore
+
+    def render(self, execution_context: SqlJsonExecutionContext) -> str:
+        query_model = execution_context.query
+        try:
+            sql_template_processor = self._sql_template_processor_factory(
+                database=query_model.database, query=query_model
+            )
+
+            rendered_query = sql_template_processor.process_template(
+                query_model.sql, **execution_context.template_params
+            )
+            self._validate(execution_context, rendered_query, 
sql_template_processor)
+            return rendered_query
+        except TemplateError as ex:
+            self._raise_template_exception(ex, execution_context)
+            return "NOT_REACHABLE_CODE"
+
+    def _validate(
+        self,
+        execution_context: SqlJsonExecutionContext,
+        rendered_query: str,
+        sql_template_processor: BaseTemplateProcessor,
+    ) -> None:
+        if is_feature_enabled("ENABLE_TEMPLATE_PROCESSING"):
+            # pylint: disable=protected-access
+            syntax_tree = sql_template_processor._env.parse(rendered_query)
+            undefined_parameters = find_undeclared_variables(  # type: ignore
+                syntax_tree
+            )
+            if undefined_parameters:
+                self._raise_undefined_parameter_exception(
+                    execution_context, undefined_parameters
+                )
+
+    def _raise_undefined_parameter_exception(
+        self, execution_context: SqlJsonExecutionContext, 
undefined_parameters: Any
+    ) -> None:
+        raise SqlQueryRenderException(
+            sql_json_execution_context=execution_context,
+            error_type=SupersetErrorType.MISSING_TEMPLATE_PARAMS_ERROR,
+            reason_message=ngettext(
+                "The parameter %(parameters)s in your query is undefined.",
+                "The following parameters in your query are undefined: 
%(parameters)s.",
+                len(undefined_parameters),
+                parameters=utils.format_list(undefined_parameters),
+            ),
+            suggestion_help_msg=PARAMETER_MISSING_ERR,
+            extra={
+                "undefined_parameters": list(undefined_parameters),
+                "template_parameters": execution_context.template_params,
+                "issue_codes": [{"code": 1006, "message": MSG_OF_1006,}],
+            },
+        )
+
+    def _raise_template_exception(
+        self, ex: Exception, execution_context: SqlJsonExecutionContext
+    ) -> None:
+        raise SqlQueryRenderException(
+            sql_json_execution_context=execution_context,
+            error_type=SupersetErrorType.INVALID_TEMPLATE_PARAMS_ERROR,
+            reason_message=__(
+                "The query contains one or more malformed template parameters."
+            ),
+            suggestion_help_msg=__(
+                "Please check your query and confirm that all template "
+                "parameters are surround by double braces, for example, "
+                '"{{ ds }}". Then, try running your query again.'
+            ),
+        ) from ex
+
+
+class SqlQueryRenderException(SqlLabException):
+    _extra: Optional[Dict[str, Any]]
+
+    def __init__(
+        self,
+        sql_json_execution_context: SqlJsonExecutionContext,
+        error_type: SupersetErrorType,
+        reason_message: Optional[str] = None,
+        exception: Optional[Exception] = None,
+        suggestion_help_msg: Optional[str] = None,
+        extra: Optional[Dict[str, Any]] = None,
+    ) -> None:
+        super().__init__(
+            sql_json_execution_context,
+            error_type,
+            reason_message,
+            exception,
+            suggestion_help_msg,
+        )
+        self._extra = extra
+
+    @property
+    def extra(self) -> Optional[Dict[str, Any]]:
+        return self._extra
+
+    def to_dict(self) -> Dict[str, Any]:
+        rv = super().to_dict()
+        if self._extra:
+            rv["extra"] = self._extra
+        return rv
diff --git a/superset/sqllab/sql_json_executer.py 
b/superset/sqllab/sql_json_executer.py
new file mode 100644
index 0000000..77023b3
--- /dev/null
+++ b/superset/sqllab/sql_json_executer.py
@@ -0,0 +1,207 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint: disable=too-few-public-methods, invalid-name
+from __future__ import annotations
+
+import dataclasses
+import logging
+from abc import ABC
+from typing import Any, Callable, Dict, Optional, TYPE_CHECKING
+
+from flask import g
+from flask_babel import gettext as __
+
+from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
+from superset.exceptions import (
+    SupersetErrorException,
+    SupersetErrorsException,
+    SupersetGenericDBErrorException,
+    SupersetTimeoutException,
+)
+from superset.sqllab.command_status import SqlJsonExecutionStatus
+from superset.utils import core as utils
+from superset.utils.dates import now_as_float
+
+if TYPE_CHECKING:
+    from superset.queries.dao import QueryDAO
+    from superset.sqllab.sqllab_execution_context import 
SqlJsonExecutionContext
+
+QueryStatus = utils.QueryStatus
+logger = logging.getLogger(__name__)
+
+SqlResults = Dict[str, Any]
+
+GetSqlResultsTask = Callable[..., SqlResults]
+
+
+class SqlJsonExecutor:
+    def execute(
+        self,
+        execution_context: SqlJsonExecutionContext,
+        rendered_query: str,
+        log_params: Optional[Dict[str, Any]],
+    ) -> SqlJsonExecutionStatus:
+        raise NotImplementedError()
+
+
+class SqlJsonExecutorBase(SqlJsonExecutor, ABC):
+    _query_dao: QueryDAO
+    _get_sql_results_task: GetSqlResultsTask
+
+    def __init__(self, query_dao: QueryDAO, get_sql_results_task: 
GetSqlResultsTask):
+        self._query_dao = query_dao
+        self._get_sql_results_task = get_sql_results_task  # type: ignore
+
+
+class SynchronousSqlJsonExecutor(SqlJsonExecutorBase):
+    _timeout_duration_in_seconds: int
+    _sqllab_backend_persistence_feature_enable: bool
+
+    def __init__(
+        self,
+        query_dao: QueryDAO,
+        get_sql_results_task: GetSqlResultsTask,
+        timeout_duration_in_seconds: int,
+        sqllab_backend_persistence_feature_enable: bool,
+    ):
+        super().__init__(query_dao, get_sql_results_task)
+        self._timeout_duration_in_seconds = timeout_duration_in_seconds
+        self._sqllab_backend_persistence_feature_enable = (
+            sqllab_backend_persistence_feature_enable
+        )
+
+    def execute(
+        self,
+        execution_context: SqlJsonExecutionContext,
+        rendered_query: str,
+        log_params: Optional[Dict[str, Any]],
+    ) -> SqlJsonExecutionStatus:
+        query_id = execution_context.query.id
+        try:
+            data = self._get_sql_results_with_timeout(
+                execution_context, rendered_query, log_params
+            )
+            self._query_dao.update_saved_query_exec_info(query_id)
+            execution_context.set_execution_result(data)
+        except SupersetTimeoutException as ex:
+            raise ex
+        except Exception as ex:
+            logger.exception("Query %i failed unexpectedly", query_id)
+            raise SupersetGenericDBErrorException(
+                utils.error_msg_from_exception(ex)
+            ) from ex
+
+        if data.get("status") == QueryStatus.FAILED:  # type: ignore
+            # new error payload with rich context
+            if data["errors"]:  # type: ignore
+                raise SupersetErrorsException(
+                    [SupersetError(**params) for params in data["errors"]]  # 
type: ignore
+                )
+            # old string-only error message
+            raise SupersetGenericDBErrorException(data["error"])  # type: 
ignore
+
+        return SqlJsonExecutionStatus.HAS_RESULTS
+
+    def _get_sql_results_with_timeout(
+        self,
+        execution_context: SqlJsonExecutionContext,
+        rendered_query: str,
+        log_params: Optional[Dict[str, Any]],
+    ) -> Optional[SqlResults]:
+        with utils.timeout(
+            seconds=self._timeout_duration_in_seconds,
+            error_message=self._get_timeout_error_msg(),
+        ):
+            return self._get_sql_results(execution_context, rendered_query, 
log_params)
+
+    def _get_sql_results(
+        self,
+        execution_context: SqlJsonExecutionContext,
+        rendered_query: str,
+        log_params: Optional[Dict[str, Any]],
+    ) -> Optional[SqlResults]:
+        return self._get_sql_results_task(
+            execution_context.query.id,
+            rendered_query,
+            return_results=True,
+            store_results=self._is_store_results(execution_context),
+            user_name=g.user.username
+            if g.user and hasattr(g.user, "username")
+            else None,
+            expand_data=execution_context.expand_data,
+            log_params=log_params,
+        )
+
+    def _is_store_results(self, execution_context: SqlJsonExecutionContext) -> 
bool:
+        return (
+            self._sqllab_backend_persistence_feature_enable
+            and not execution_context.select_as_cta
+        )
+
+    def _get_timeout_error_msg(self) -> str:
+        return "The query exceeded the {timeout} seconds timeout.".format(
+            timeout=self._timeout_duration_in_seconds
+        )
+
+
+class ASynchronousSqlJsonExecutor(SqlJsonExecutorBase):
+    def execute(
+        self,
+        execution_context: SqlJsonExecutionContext,
+        rendered_query: str,
+        log_params: Optional[Dict[str, Any]],
+    ) -> SqlJsonExecutionStatus:
+
+        query_id = execution_context.query.id
+        logger.info("Query %i: Running query on a Celery worker", query_id)
+        try:
+            task = self._get_sql_results_task.delay(  # type: ignore
+                query_id,
+                rendered_query,
+                return_results=False,
+                store_results=not execution_context.select_as_cta,
+                user_name=g.user.username
+                if g.user and hasattr(g.user, "username")
+                else None,
+                start_time=now_as_float(),
+                expand_data=execution_context.expand_data,
+                log_params=log_params,
+            )
+            try:
+                task.forget()
+            except NotImplementedError:
+                logger.warning(
+                    "Unable to forget Celery task as backend"
+                    "does not support this operation"
+                )
+        except Exception as ex:
+            logger.exception("Query %i: %s", query_id, str(ex))
+
+            message = __("Failed to start remote query on a worker.")
+            error = SupersetError(
+                message=message,
+                error_type=SupersetErrorType.ASYNC_WORKERS_ERROR,
+                level=ErrorLevel.ERROR,
+            )
+            error_payload = dataclasses.asdict(error)
+            query = execution_context.query
+            query.set_extra_json_key("errors", [error_payload])
+            query.status = QueryStatus.FAILED
+            query.error_message = message
+            raise SupersetErrorException(error) from ex
+        self._query_dao.update_saved_query_exec_info(query_id)
+        return SqlJsonExecutionStatus.QUERY_IS_RUNNING
diff --git a/superset/utils/sqllab_execution_context.py 
b/superset/sqllab/sqllab_execution_context.py
similarity index 100%
rename from superset/utils/sqllab_execution_context.py
rename to superset/sqllab/sqllab_execution_context.py
diff --git a/superset/sqllab/validators.py b/superset/sqllab/validators.py
new file mode 100644
index 0000000..726a276
--- /dev/null
+++ b/superset/sqllab/validators.py
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint: disable=too-few-public-methods
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from superset import security_manager
+from superset.sqllab.command import CanAccessQueryValidator
+
+if TYPE_CHECKING:
+    from superset.models.sql_lab import Query
+
+
+class CanAccessQueryValidatorImpl(CanAccessQueryValidator):
+    def validate(self, query: Query) -> None:
+        security_manager.raise_for_access(query=query)
diff --git a/superset/views/core.py b/superset/views/core.py
index dadda30..c9bcac7 100755
--- a/superset/views/core.py
+++ b/superset/views/core.py
@@ -14,7 +14,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-# pylint: disable=too-many-lines
+# pylint: disable=too-many-lines, invalid-name
 from __future__ import annotations
 
 import logging
@@ -95,14 +95,28 @@ from superset.models.datasource_access_request import 
DatasourceAccessRequest
 from superset.models.slice import Slice
 from superset.models.sql_lab import Query, TabState
 from superset.models.user_attributes import UserAttribute
+from superset.queries.dao import QueryDAO
 from superset.security.analytics_db_safety import check_sqlalchemy_uri
+from superset.sql_lab import get_sql_results
 from superset.sql_parse import ParsedQuery, Table
 from superset.sql_validators import get_validator_by_name
 from superset.sqllab.command import CommandResult, ExecuteSqlCommand
 from superset.sqllab.command_status import SqlJsonExecutionStatus
-from superset.sqllab.exceptions import SqlLabException
+from superset.sqllab.exceptions import (
+    QueryIsForbiddenToAccessException,
+    SqlLabException,
+)
+from superset.sqllab.execution_context_convertor import 
ExecutionContextConvertorImpl
 from superset.sqllab.limiting_factor import LimitingFactor
+from superset.sqllab.query_render import SqlQueryRenderImpl
+from superset.sqllab.sql_json_executer import (
+    ASynchronousSqlJsonExecutor,
+    SqlJsonExecutor,
+    SynchronousSqlJsonExecutor,
+)
+from superset.sqllab.sqllab_execution_context import SqlJsonExecutionContext
 from superset.sqllab.utils import 
apply_display_max_row_configuration_if_require
+from superset.sqllab.validators import CanAccessQueryValidatorImpl
 from superset.tasks.async_queries import load_explore_json_into_cache
 from superset.typing import FlaskResponse
 from superset.utils import core as utils, csv
@@ -111,7 +125,6 @@ from superset.utils.cache import etag_cache
 from superset.utils.core import apply_max_row_limit, ReservedUrlParameters
 from superset.utils.dates import now_as_float
 from superset.utils.decorators import check_dashboard_access
-from superset.utils.sqllab_execution_context import SqlJsonExecutionContext
 from superset.views.base import (
     api,
     BaseSupersetView,
@@ -2440,13 +2453,60 @@ class Superset(BaseSupersetView):  # pylint: 
disable=too-many-public-methods
                 "user_agent": cast(Optional[str], 
request.headers.get("USER_AGENT"))
             }
             execution_context = SqlJsonExecutionContext(request.json)
-            command = ExecuteSqlCommand(execution_context, log_params)
+            command = self._create_sql_json_command(execution_context, 
log_params)
             command_result: CommandResult = command.run()
             return self._create_response_from_execution_context(command_result)
         except SqlLabException as ex:
+            logger.error(ex.message)
+            self._set_http_status_into_Sql_lab_exception(ex)
             payload = {"errors": [ex.to_dict()]}
             return json_error_response(status=ex.status, payload=payload)
 
+    @staticmethod
+    def _create_sql_json_command(
+        execution_context: SqlJsonExecutionContext, log_params: 
Optional[Dict[str, Any]]
+    ) -> ExecuteSqlCommand:
+        query_dao = QueryDAO()
+        sql_json_executor = Superset._create_sql_json_executor(
+            execution_context, query_dao
+        )
+        execution_context_convertor = ExecutionContextConvertorImpl()
+        execution_context_convertor.set_max_row_in_display(
+            int(config.get("DISPLAY_MAX_ROW"))  # type: ignore
+        )
+        return ExecuteSqlCommand(
+            execution_context,
+            query_dao,
+            DatabaseDAO(),
+            CanAccessQueryValidatorImpl(),
+            SqlQueryRenderImpl(get_template_processor),
+            sql_json_executor,
+            execution_context_convertor,
+            config.get("SQLLAB_CTAS_NO_LIMIT"),  # type: ignore
+            log_params,
+        )
+
+    @staticmethod
+    def _create_sql_json_executor(
+        execution_context: SqlJsonExecutionContext, query_dao: QueryDAO
+    ) -> SqlJsonExecutor:
+        sql_json_executor: SqlJsonExecutor
+        if execution_context.is_run_asynchronous():
+            sql_json_executor = ASynchronousSqlJsonExecutor(query_dao, 
get_sql_results)
+        else:
+            sql_json_executor = SynchronousSqlJsonExecutor(
+                query_dao,
+                get_sql_results,
+                config.get("SQLLAB_TIMEOUT"),  # type: ignore
+                is_feature_enabled("SQLLAB_BACKEND_PERSISTENCE"),
+            )
+        return sql_json_executor
+
+    @staticmethod
+    def _set_http_status_into_Sql_lab_exception(ex: SqlLabException) -> None:
+        if isinstance(ex, QueryIsForbiddenToAccessException):
+            ex.status = 403
+
     def _create_response_from_execution_context(  # pylint: 
disable=invalid-name, no-self-use
         self, command_result: CommandResult,
     ) -> FlaskResponse:
diff --git a/tests/integration_tests/celery_tests.py 
b/tests/integration_tests/celery_tests.py
index f4224d2..c7465bc 100644
--- a/tests/integration_tests/celery_tests.py
+++ b/tests/integration_tests/celery_tests.py
@@ -217,7 +217,7 @@ def test_run_sync_query_cta_no_data(setup_sqllab):
 @pytest.mark.usefixtures("load_birth_names_dashboard_with_slices")
 @pytest.mark.parametrize("ctas_method", [CtasMethod.TABLE, CtasMethod.VIEW])
 @mock.patch(
-    "superset.utils.sqllab_execution_context.get_cta_schema_name",
+    "superset.sqllab.sqllab_execution_context.get_cta_schema_name",
     lambda d, u, s, sql: CTAS_SCHEMA_NAME,
 )
 def test_run_sync_query_cta_config(setup_sqllab, ctas_method):
@@ -245,7 +245,7 @@ def test_run_sync_query_cta_config(setup_sqllab, 
ctas_method):
 @pytest.mark.usefixtures("load_birth_names_dashboard_with_slices")
 @pytest.mark.parametrize("ctas_method", [CtasMethod.TABLE, CtasMethod.VIEW])
 @mock.patch(
-    "superset.utils.sqllab_execution_context.get_cta_schema_name",
+    "superset.sqllab.sqllab_execution_context.get_cta_schema_name",
     lambda d, u, s, sql: CTAS_SCHEMA_NAME,
 )
 def test_run_async_query_cta_config(setup_sqllab, ctas_method):
diff --git a/tests/integration_tests/core_tests.py 
b/tests/integration_tests/core_tests.py
index f91ab45..c35eb1b 100644
--- a/tests/integration_tests/core_tests.py
+++ b/tests/integration_tests/core_tests.py
@@ -749,10 +749,11 @@ class TestCore(SupersetTestCase):
         data = self.run_sql(sql, "fdaklj3ws")
         self.assertEqual(data["data"][0]["test"], "2")
 
+    @pytest.mark.ofek
     @mock.patch(
         
"tests.integration_tests.superset_test_custom_template_processors.datetime"
     )
-    @mock.patch("superset.sql_lab.get_sql_results")
+    @mock.patch("superset.views.core.get_sql_results")
     def test_custom_templated_sql_json(self, sql_lab_mock, mock_dt) -> None:
         """Test sqllab receives macros expanded query."""
         mock_dt.utcnow = mock.Mock(return_value=datetime.datetime(1970, 1, 1))
diff --git a/tests/integration_tests/sqllab_tests.py 
b/tests/integration_tests/sqllab_tests.py
index 1e97514..b6dea6c 100644
--- a/tests/integration_tests/sqllab_tests.py
+++ b/tests/integration_tests/sqllab_tests.py
@@ -189,7 +189,7 @@ class TestSqlLab(SupersetTestCase):
             return
 
         with mock.patch(
-            "superset.utils.sqllab_execution_context.get_cta_schema_name",
+            "superset.sqllab.sqllab_execution_context.get_cta_schema_name",
             lambda d, u, s, sql: f"{u.username}_database",
         ):
             old_allow_ctas = examples_db.allow_ctas

Reply via email to