minor, mv method from query controller to service Signed-off-by: Li Yang <liy...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/44cf9fba Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/44cf9fba Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/44cf9fba Branch: refs/heads/master-cdh5.7 Commit: 44cf9fba5900252fb3b4364d4bd129d83212d51c Parents: ef44b7f Author: Roger Shi <ro...@kyligence.io> Authored: Mon Oct 17 13:21:37 2016 +0800 Committer: Li Yang <liy...@apache.org> Committed: Mon Oct 17 13:33:08 2016 +0800 ---------------------------------------------------------------------- .../kylin/rest/controller/QueryController.java | 136 +------------------ .../apache/kylin/rest/service/CacheService.java | 5 +- .../apache/kylin/rest/service/QueryService.java | 132 +++++++++++++++++- .../rest/controller/QueryControllerTest.java | 2 +- 4 files changed, 134 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/44cf9fba/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java index 9471937..c5f896d 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java @@ -23,15 +23,10 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.List; -import javax.annotation.PostConstruct; import javax.servlet.http.HttpServletResponse; import org.apache.commons.io.IOUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.debug.BackdoorToggles; -import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.exception.InternalErrorException; -import org.apache.kylin.rest.metrics.QueryMetricsFacade; import org.apache.kylin.rest.model.Query; import org.apache.kylin.rest.model.SelectedColumnMeta; import org.apache.kylin.rest.model.TableMeta; @@ -41,12 +36,9 @@ import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.request.SaveSqlRequest; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.service.QueryService; -import org.apache.kylin.rest.util.QueryUtil; -import org.apache.kylin.storage.exception.ScanOutOfLimitException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.security.access.AccessDeniedException; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.PathVariable; @@ -58,12 +50,6 @@ import org.supercsv.io.CsvListWriter; import org.supercsv.io.ICsvListWriter; import org.supercsv.prefs.CsvPreference; -import com.google.common.base.Preconditions; - -import net.sf.ehcache.Cache; -import net.sf.ehcache.CacheManager; -import net.sf.ehcache.Element; - /** * Handle query requests. * @@ -74,31 +60,20 @@ public class QueryController extends BasicController { private static final Logger logger = LoggerFactory.getLogger(QueryController.class); - public static final String SUCCESS_QUERY_CACHE = "StorageCache"; - public static final String EXCEPTION_QUERY_CACHE = "ExceptionQueryCache"; - @Autowired private QueryService queryService; - @Autowired - private CacheManager cacheManager; - - @PostConstruct - public void init() throws IOException { - Preconditions.checkNotNull(cacheManager, "cacheManager is not injected yet"); - } - @RequestMapping(value = "/query", method = RequestMethod.POST) @ResponseBody public SQLResponse query(@RequestBody SQLRequest sqlRequest) { - return doQueryWithCache(sqlRequest); + return queryService.doQueryWithCache(sqlRequest); } // TODO should be just "prepare" a statement, get back expected ResultSetMetaData @RequestMapping(value = "/query/prestate", method = RequestMethod.POST, produces = "application/json") @ResponseBody public SQLResponse prepareQuery(@RequestBody PrepareSqlRequest sqlRequest) { - return doQueryWithCache(sqlRequest); + return queryService.doQueryWithCache(sqlRequest); } @RequestMapping(value = "/saved_queries", method = RequestMethod.POST) @@ -127,7 +102,7 @@ public class QueryController extends BasicController { @RequestMapping(value = "/query/format/{format}", method = RequestMethod.GET) @ResponseBody public void downloadQueryResult(@PathVariable String format, SQLRequest sqlRequest, HttpServletResponse response) { - SQLResponse result = doQueryWithCache(sqlRequest); + SQLResponse result = queryService.doQueryWithCache(sqlRequest); response.setContentType("text/" + format + ";charset=utf-8"); response.setHeader("Content-Disposition", "attachment; filename=\"result." + format + "\""); ICsvListWriter csvWriter = null; @@ -164,112 +139,7 @@ public class QueryController extends BasicController { } } - private SQLResponse doQueryWithCache(SQLRequest sqlRequest) { - try { - BackdoorToggles.setToggles(sqlRequest.getBackdoorToggles()); - - String sql = sqlRequest.getSql(); - String project = sqlRequest.getProject(); - logger.info("Using project: " + project); - logger.info("The original query: " + sql); - - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - String serverMode = kylinConfig.getServerMode(); - if (!(Constant.SERVER_MODE_QUERY.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase()))) { - throw new InternalErrorException("Query is not allowed in " + serverMode + " mode."); - } - - if (!sql.toLowerCase().contains("select")) { - logger.debug("Directly return exception as not supported"); - throw new InternalErrorException("Not Supported SQL."); - } - - long startTime = System.currentTimeMillis(); - - SQLResponse sqlResponse = null; - boolean queryCacheEnabled = kylinConfig.isQueryCacheEnabled() && !BackdoorToggles.getDisableCache(); - if (queryCacheEnabled) { - sqlResponse = searchQueryInCache(sqlRequest); - } - - try { - if (null == sqlResponse) { - sqlResponse = queryService.query(sqlRequest); - - long durationThreshold = kylinConfig.getQueryDurationCacheThreshold(); - long scancountThreshold = kylinConfig.getQueryScanCountCacheThreshold(); - sqlResponse.setDuration(System.currentTimeMillis() - startTime); - logger.info("Stats of SQL response: isException: {}, duration: {}, total scan count {}", // - String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()), String.valueOf(sqlResponse.getTotalScanCount())); - if (queryCacheEnabled && !sqlResponse.getIsException() // - && (sqlResponse.getDuration() > durationThreshold || sqlResponse.getTotalScanCount() > scancountThreshold)) { - cacheManager.getCache(SUCCESS_QUERY_CACHE).put(new Element(sqlRequest, sqlResponse)); - } - } else { - sqlResponse.setDuration(System.currentTimeMillis() - startTime); - } - - checkQueryAuth(sqlResponse); - - } catch (Throwable e) { // calcite may throw AssertError - logger.error("Exception when execute sql", e); - String errMsg = QueryUtil.makeErrorMsgUserFriendly(e); - - sqlResponse = new SQLResponse(null, null, 0, true, errMsg); - - // for exception queries, only cache ScanOutOfLimitException - if (queryCacheEnabled && e instanceof ScanOutOfLimitException) { - Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE); - exceptionCache.put(new Element(sqlRequest, sqlResponse)); - } - } - - queryService.logQuery(sqlRequest, sqlResponse); - - QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse); - - if (sqlResponse.getIsException()) - throw new InternalErrorException(sqlResponse.getExceptionMessage()); - - return sqlResponse; - - } finally { - BackdoorToggles.cleanToggles(); - } - } - - private SQLResponse searchQueryInCache(SQLRequest sqlRequest) { - SQLResponse response = null; - Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE); - Cache successCache = cacheManager.getCache(SUCCESS_QUERY_CACHE); - - if (exceptionCache.get(sqlRequest) != null) { - logger.info("The sqlResponse is found in EXCEPTION_QUERY_CACHE"); - Element element = exceptionCache.get(sqlRequest); - response = (SQLResponse) element.getObjectValue(); - response.setHitExceptionCache(true); - } else if (successCache.get(sqlRequest) != null) { - logger.info("The sqlResponse is found in SUCCESS_QUERY_CACHE"); - Element element = successCache.get(sqlRequest); - response = (SQLResponse) element.getObjectValue(); - response.setStorageCacheUsed(true); - } - - return response; - } - - private void checkQueryAuth(SQLResponse sqlResponse) throws AccessDeniedException { - if (!sqlResponse.getIsException() && KylinConfig.getInstanceFromEnv().isQuerySecureEnabled()) { - queryService.checkAuthorization(sqlResponse.getCube()); - } - } - public void setQueryService(QueryService queryService) { this.queryService = queryService; } - - public void setCacheManager(CacheManager cacheManager) { - this.cacheManager = cacheManager; - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/44cf9fba/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java index c9c2dd7..0938e95 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java @@ -36,7 +36,6 @@ import org.apache.kylin.metadata.cachesync.Broadcaster.Event; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.query.enumerator.OLAPQuery; import org.apache.kylin.query.schema.OLAPSchemaFactory; -import org.apache.kylin.rest.controller.QueryController; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -119,8 +118,8 @@ public class CacheService extends BasicService { protected void cleanDataCache(String project) { if (cacheManager != null) { logger.info("cleaning cache for project" + project + " (currently remove all entries)"); - cacheManager.getCache(QueryController.SUCCESS_QUERY_CACHE).removeAll(); - cacheManager.getCache(QueryController.EXCEPTION_QUERY_CACHE).removeAll(); + cacheManager.getCache(QueryService.SUCCESS_QUERY_CACHE).removeAll(); + cacheManager.getCache(QueryService.EXCEPTION_QUERY_CACHE).removeAll(); } else { logger.warn("skip cleaning cache for project " + project); } http://git-wip-us.apache.org/repos/asf/kylin/blob/44cf9fba/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index cda4a52..a7ac4b6 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import javax.annotation.PostConstruct; import javax.sql.DataSource; import org.apache.calcite.avatica.ColumnMetaData.Rep; @@ -51,6 +52,7 @@ import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.DBUtils; import org.apache.kylin.cube.CubeInstance; @@ -60,6 +62,8 @@ import org.apache.kylin.metadata.project.RealizationEntry; import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.query.relnode.OLAPContext; import org.apache.kylin.rest.constant.Constant; +import org.apache.kylin.rest.exception.InternalErrorException; +import org.apache.kylin.rest.metrics.QueryMetricsFacade; import org.apache.kylin.rest.model.ColumnMeta; import org.apache.kylin.rest.model.Query; import org.apache.kylin.rest.model.SelectedColumnMeta; @@ -69,6 +73,7 @@ import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.util.QueryUtil; import org.apache.kylin.rest.util.Serializer; +import org.apache.kylin.storage.exception.ScanOutOfLimitException; import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hybrid.HybridInstance; import org.slf4j.Logger; @@ -80,8 +85,13 @@ import org.springframework.security.core.GrantedAuthority; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Component; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import net.sf.ehcache.Cache; +import net.sf.ehcache.CacheManager; +import net.sf.ehcache.Element; + /** * @author xduo */ @@ -90,14 +100,14 @@ public class QueryService extends BasicService { private static final Logger logger = LoggerFactory.getLogger(QueryService.class); - @Autowired - private CacheService cacheService; - public static final String USER_QUERY_FAMILY = "q"; private static final String DEFAULT_TABLE_PREFIX = "kylin_metadata"; private static final String USER_TABLE_NAME = "_user"; private static final String USER_QUERY_COLUMN = "c"; + public static final String SUCCESS_QUERY_CACHE = "StorageCache"; + public static final String EXCEPTION_QUERY_CACHE = "ExceptionQueryCache"; + private final Serializer<Query[]> querySerializer = new Serializer<Query[]>(Query[].class); private final BadQueryDetector badQueryDetector = new BadQueryDetector(); @@ -105,6 +115,17 @@ public class QueryService extends BasicService { private final String tableNameBase; private final String userTableName; + @Autowired + private CacheManager cacheManager; + + @Autowired + private CacheService cacheService; + + @PostConstruct + public void init() throws IOException { + Preconditions.checkNotNull(cacheManager, "cacheManager is not injected yet"); + } + public QueryService() { String metadataUrl = KylinConfig.getInstanceFromEnv().getMetadataUrl(); // split TABLE@HBASE_URL @@ -292,6 +313,106 @@ public class QueryService extends BasicService { } } + public SQLResponse doQueryWithCache(SQLRequest sqlRequest) { + try { + BackdoorToggles.setToggles(sqlRequest.getBackdoorToggles()); + + String sql = sqlRequest.getSql(); + String project = sqlRequest.getProject(); + logger.info("Using project: " + project); + logger.info("The original query: " + sql); + + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + String serverMode = kylinConfig.getServerMode(); + if (!(Constant.SERVER_MODE_QUERY.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase()))) { + throw new InternalErrorException("Query is not allowed in " + serverMode + " mode."); + } + + if (!sql.toLowerCase().contains("select")) { + logger.debug("Directly return exception as not supported"); + throw new InternalErrorException("Not Supported SQL."); + } + + long startTime = System.currentTimeMillis(); + + SQLResponse sqlResponse = null; + boolean queryCacheEnabled = kylinConfig.isQueryCacheEnabled() && !BackdoorToggles.getDisableCache(); + if (queryCacheEnabled) { + sqlResponse = searchQueryInCache(sqlRequest); + } + + try { + if (null == sqlResponse) { + sqlResponse = query(sqlRequest); + + long durationThreshold = kylinConfig.getQueryDurationCacheThreshold(); + long scancountThreshold = kylinConfig.getQueryScanCountCacheThreshold(); + sqlResponse.setDuration(System.currentTimeMillis() - startTime); + logger.info("Stats of SQL response: isException: {}, duration: {}, total scan count {}", // + String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()), String.valueOf(sqlResponse.getTotalScanCount())); + if (queryCacheEnabled && !sqlResponse.getIsException() // + && (sqlResponse.getDuration() > durationThreshold || sqlResponse.getTotalScanCount() > scancountThreshold)) { + cacheManager.getCache(SUCCESS_QUERY_CACHE).put(new Element(sqlRequest, sqlResponse)); + } + } else { + sqlResponse.setDuration(System.currentTimeMillis() - startTime); + } + + checkQueryAuth(sqlResponse); + + } catch (Throwable e) { // calcite may throw AssertError + logger.error("Exception when execute sql", e); + String errMsg = QueryUtil.makeErrorMsgUserFriendly(e); + + sqlResponse = new SQLResponse(null, null, 0, true, errMsg); + + // for exception queries, only cache ScanOutOfLimitException + if (queryCacheEnabled && e instanceof ScanOutOfLimitException) { + Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE); + exceptionCache.put(new Element(sqlRequest, sqlResponse)); + } + } + + logQuery(sqlRequest, sqlResponse); + + QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse); + + if (sqlResponse.getIsException()) + throw new InternalErrorException(sqlResponse.getExceptionMessage()); + + return sqlResponse; + + } finally { + BackdoorToggles.cleanToggles(); + } + } + + public SQLResponse searchQueryInCache(SQLRequest sqlRequest) { + SQLResponse response = null; + Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE); + Cache successCache = cacheManager.getCache(SUCCESS_QUERY_CACHE); + + if (exceptionCache.get(sqlRequest) != null) { + logger.info("The sqlResponse is found in EXCEPTION_QUERY_CACHE"); + Element element = exceptionCache.get(sqlRequest); + response = (SQLResponse) element.getObjectValue(); + response.setHitExceptionCache(true); + } else if (successCache.get(sqlRequest) != null) { + logger.info("The sqlResponse is found in SUCCESS_QUERY_CACHE"); + Element element = successCache.get(sqlRequest); + response = (SQLResponse) element.getObjectValue(); + response.setStorageCacheUsed(true); + } + + return response; + } + + private void checkQueryAuth(SQLResponse sqlResponse) throws AccessDeniedException { + if (!sqlResponse.getIsException() && KylinConfig.getInstanceFromEnv().isQuerySecureEnabled()) { + checkAuthorization(sqlResponse.getCube()); + } + } + private SQLResponse queryWithSqlMassage(SQLRequest sqlRequest) throws Exception { String userInfo = SecurityContextHolder.getContext().getAuthentication().getName(); final Collection<? extends GrantedAuthority> grantedAuthorities = SecurityContextHolder.getContext().getAuthentication().getAuthorities(); @@ -368,7 +489,7 @@ public class QueryService extends BasicService { tableMap.get(colmnMeta.getTABLE_SCHEM() + "#" + colmnMeta.getTABLE_NAME()).addColumn(colmnMeta); } } - + } finally { close(columnMeta, null, conn); if (JDBCTableMeta != null) { @@ -541,4 +662,7 @@ public class QueryService extends BasicService { DBUtils.closeQuietly(conn); } + public void setCacheManager(CacheManager cacheManager) { + this.cacheManager = cacheManager; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/44cf9fba/server/src/test/java/org/apache/kylin/rest/controller/QueryControllerTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/controller/QueryControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/QueryControllerTest.java index 3180075..e84235b 100644 --- a/server/src/test/java/org/apache/kylin/rest/controller/QueryControllerTest.java +++ b/server/src/test/java/org/apache/kylin/rest/controller/QueryControllerTest.java @@ -49,7 +49,7 @@ public class QueryControllerTest extends ServiceTestBase { queryController = new QueryController(); queryController.setQueryService(queryService); - queryController.setCacheManager(cacheManager); + queryService.setCacheManager(cacheManager); } @Test(expected = Exception.class)