Repository: incubator-kylin Updated Branches: refs/heads/2.x-staging f9f96404f -> a3397d044
KYLIN-1127 Move cache related stuff from BasicService into CacheService Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f1dedab7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f1dedab7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f1dedab7 Branch: refs/heads/2.x-staging Commit: f1dedab768f52b7dcb22e16b1897167703897fc4 Parents: f9f9640 Author: Yang Li <liy...@apache.org> Authored: Mon Nov 9 10:41:59 2015 +0800 Committer: Yang Li <liy...@apache.org> Committed: Mon Nov 9 10:41:59 2015 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/cube/CubeManager.java | 8 +- .../engine/streaming/cli/StreamingCLI.java | 2 - .../kylin/rest/controller/JobController.java | 2 +- .../rest/controller/StreamingController.java | 2 +- .../apache/kylin/rest/service/BasicService.java | 129 +--------- .../apache/kylin/rest/service/CacheService.java | 257 +++++++++++++------ .../apache/kylin/rest/service/QueryService.java | 32 ++- .../kylin/rest/service/StreamingService.java | 12 +- .../kylin/rest/service/CacheServiceTest.java | 2 +- .../kylin/rest/service/CubeServiceTest.java | 7 +- .../kylin/rest/service/JobServiceTest.java | 7 +- .../kylin/rest/service/QueryServiceTest.java | 7 +- .../kylin/rest/service/ServiceTestBase.java | 3 +- 13 files changed, 236 insertions(+), 234 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 2232f01..9b7a024 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -525,11 +525,7 @@ public class CubeManager implements IRealizationProvider { * @param cubeName */ public CubeInstance reloadCubeLocal(String cubeName) { - try { - return reloadCubeLocalAt(CubeInstance.concatResourcePath(cubeName)); - } catch (IOException e) { - throw new RuntimeException(e); - } + return reloadCubeLocalAt(CubeInstance.concatResourcePath(cubeName)); } public void removeCubeLocal(String cubeName) { @@ -792,7 +788,7 @@ public class CubeManager implements IRealizationProvider { logger.debug("Loaded " + paths.size() + " Cube(s)"); } - private synchronized CubeInstance reloadCubeLocalAt(String path) throws IOException { + private synchronized CubeInstance reloadCubeLocalAt(String path) { ResourceStore store = getStore(); CubeInstance cubeInstance; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java index 3b1693a..e3a7133 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java @@ -56,8 +56,6 @@ public class StreamingCLI { public static void main(String[] args) { try { - AbstractRestCache.setCacheUpdater(new RemoteCacheUpdater()); - Preconditions.checkArgument(args[0].equals("streaming")); Preconditions.checkArgument(args[1].equals("start")); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/main/java/org/apache/kylin/rest/controller/JobController.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java index 6fb2813..f6323ed 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java @@ -71,7 +71,7 @@ public class JobController extends BasicController implements InitializingBean { @Override public void afterPropertiesSet() throws Exception { - String timeZone = jobService.getKylinConfig().getTimeZone(); + String timeZone = jobService.getConfig().getTimeZone(); TimeZone tzone = TimeZone.getTimeZone(timeZone); TimeZone.setDefault(tzone); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java index 78b63c0..e22bd30 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java @@ -158,7 +158,7 @@ public class StreamingController extends BasicController { @RequestMapping(value = "/{configName}", method = { RequestMethod.DELETE }) @ResponseBody public void deleteConfig(@PathVariable String configName) throws IOException { - StreamingConfig config = streamingService.getSreamingManager().getStreamingConfig(configName); + StreamingConfig config = streamingService.getStreamingManager().getStreamingConfig(configName); KafkaConfig kafkaConfig = kafkaConfigService.getKafkaConfig(configName); if (null == config) { throw new NotFoundException("StreamingConfig with name " + configName + " not found.."); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/main/java/org/apache/kylin/rest/service/BasicService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java index 5ac12ea..9135dfa 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java @@ -18,27 +18,12 @@ package org.apache.kylin.rest.service; -import java.io.File; import java.io.IOException; -import java.nio.charset.Charset; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; import java.util.EnumSet; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import javax.sql.DataSource; - -import net.sf.ehcache.CacheManager; - -import org.apache.calcite.jdbc.Driver; -import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeDescManager; import org.apache.kylin.cube.CubeManager; @@ -54,104 +39,17 @@ import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.metadata.realization.RealizationType; -import org.apache.kylin.query.enumerator.OLAPQuery; -import org.apache.kylin.query.relnode.OLAPContext; -import org.apache.kylin.query.schema.OLAPSchemaFactory; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.storage.hybrid.HybridManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.jdbc.datasource.DriverManagerDataSource; import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; import com.google.common.collect.Lists; -import com.google.common.io.Files; public abstract class BasicService { - private static final Logger logger = LoggerFactory.getLogger(BasicService.class); - - private static ConcurrentMap<String, DataSource> olapDataSources = new ConcurrentHashMap<String, DataSource>(); - - @Autowired - private CacheManager cacheManager; - public KylinConfig getConfig() { - return KylinConfig.getInstanceFromEnv(); - } - - protected void cleanDataCache(String storageUUID) { - if (cacheManager != null && cacheManager.getCache(storageUUID) != null) { - logger.info("cleaning cache for " + storageUUID); - cacheManager.getCache(storageUUID).removeAll(); - } else { - logger.warn("skip cleaning cache for " + storageUUID); - } - } - - protected void cleanAllDataCache() { - if (cacheManager != null) { - logger.warn("cleaning all storage cache"); - cacheManager.clearAll(); - } else { - logger.warn("skip cleaning all storage cache"); - } - } - - public void removeOLAPDataSource(String project) { - logger.info("removeOLAPDataSource is called for project " + project); - if (StringUtils.isEmpty(project)) - throw new IllegalArgumentException("removeOLAPDataSource: project name not given"); - - project = ProjectInstance.getNormalizedProjectName(project); - olapDataSources.remove(project); - } - - public static void removeAllOLAPDataSources() { - // brutal, yet simplest way - logger.info("removeAllOLAPDataSources is called."); - olapDataSources.clear(); - } - - public DataSource getOLAPDataSource(String project) { - - project = ProjectInstance.getNormalizedProjectName(project); - - DataSource ret = olapDataSources.get(project); - if (ret == null) { - logger.debug("Creating a new data source"); - logger.debug("OLAP data source pointing to " + getConfig()); - - File modelJson = OLAPSchemaFactory.createTempOLAPJson(project, getConfig()); - - try { - List<String> text = Files.readLines(modelJson, Charset.defaultCharset()); - logger.debug("The new temp olap json is :"); - for (String line : text) - logger.debug(line); - } catch (IOException e) { - e.printStackTrace(); // logging failure is not critical - } - - DriverManagerDataSource ds = new DriverManagerDataSource(); - Properties props = new Properties(); - props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, String.valueOf(KylinConfig.getInstanceFromEnv().getScanThreshold())); - ds.setConnectionProperties(props); - ds.setDriverClassName(Driver.class.getName()); - ds.setUrl("jdbc:calcite:model=" + modelJson.getAbsolutePath()); - - ret = olapDataSources.putIfAbsent(project, ds); - if (ret == null) { - ret = ds; - } - } - return ret; - } - - public final KylinConfig getKylinConfig() { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); if (kylinConfig == null) { @@ -160,7 +58,7 @@ public abstract class BasicService { return kylinConfig; } - + public final MetadataManager getMetadataManager() { return MetadataManager.getInstance(getConfig()); } @@ -169,7 +67,7 @@ public abstract class BasicService { return CubeManager.getInstance(getConfig()); } - public final StreamingManager getSreamingManager() { + public final StreamingManager getStreamingManager() { return StreamingManager.getInstance(getConfig()); } @@ -246,27 +144,4 @@ public abstract class BasicService { return listAllCubingJobs(cubeName, projectName, EnumSet.allOf(ExecutableState.class), getExecutableManager().getAllOutputs()); } - protected static void close(ResultSet resultSet, Statement stat, Connection conn) { - OLAPContext.clearParameter(); - - if (resultSet != null) - try { - resultSet.close(); - } catch (SQLException e) { - logger.error("failed to close", e); - } - if (stat != null) - try { - stat.close(); - } catch (SQLException e) { - logger.error("failed to close", e); - } - if (conn != null) - try { - conn.close(); - } catch (SQLException e) { - logger.error("failed to close", e); - } - } - } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/main/java/org/apache/kylin/rest/service/CacheService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java index 63fe8c4..f9c3ec1 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java @@ -18,8 +18,21 @@ package org.apache.kylin.rest.service; +import java.io.File; import java.io.IOException; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import javax.sql.DataSource; + +import net.sf.ehcache.CacheManager; + +import org.apache.calcite.jdbc.Driver; +import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.restclient.Broadcaster; import org.apache.kylin.cube.CubeDescManager; import org.apache.kylin.cube.CubeInstance; @@ -32,88 +45,172 @@ import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.metadata.realization.RealizationRegistry; import org.apache.kylin.metadata.realization.RealizationType; +import org.apache.kylin.query.enumerator.OLAPQuery; +import org.apache.kylin.query.schema.OLAPSchemaFactory; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.storage.hybrid.HybridManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.datasource.DriverManagerDataSource; import org.springframework.stereotype.Component; +import com.google.common.io.Files; + /** */ @Component("cacheService") public class CacheService extends BasicService { + private static final Logger logger = LoggerFactory.getLogger(CacheService.class); + + private static ConcurrentMap<String, DataSource> olapDataSources = new ConcurrentHashMap<String, DataSource>(); + @Autowired private CubeService cubeService; + @Autowired + private CacheManager cacheManager; + + // for test public void setCubeService(CubeService cubeService) { this.cubeService = cubeService; } - private static final Logger logger = LoggerFactory.getLogger(CacheService.class); + protected void cleanDataCache(String storageUUID) { + if (cacheManager != null && cacheManager.getCache(storageUUID) != null) { + logger.info("cleaning cache for " + storageUUID); + cacheManager.getCache(storageUUID).removeAll(); + } else { + logger.warn("skip cleaning cache for " + storageUUID); + } + } + + protected void cleanAllDataCache() { + if (cacheManager != null) { + logger.warn("cleaning all storage cache"); + cacheManager.clearAll(); + } else { + logger.warn("skip cleaning all storage cache"); + } + } + + protected void removeOLAPDataSource(String project) { + logger.info("removeOLAPDataSource is called for project " + project); + if (StringUtils.isEmpty(project)) + throw new IllegalArgumentException("removeOLAPDataSource: project name not given"); + + project = ProjectInstance.getNormalizedProjectName(project); + olapDataSources.remove(project); + } + + public static void removeAllOLAPDataSources() { + // brutal, yet simplest way + logger.info("removeAllOLAPDataSources is called."); + olapDataSources.clear(); + } + + public DataSource getOLAPDataSource(String project) { + + project = ProjectInstance.getNormalizedProjectName(project); + + DataSource ret = olapDataSources.get(project); + if (ret == null) { + logger.debug("Creating a new data source"); + logger.debug("OLAP data source pointing to " + getConfig()); + + File modelJson = OLAPSchemaFactory.createTempOLAPJson(project, getConfig()); + + try { + List<String> text = Files.readLines(modelJson, Charset.defaultCharset()); + logger.debug("The new temp olap json is :"); + for (String line : text) + logger.debug(line); + } catch (IOException e) { + e.printStackTrace(); // logging failure is not critical + } + + DriverManagerDataSource ds = new DriverManagerDataSource(); + Properties props = new Properties(); + props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, String.valueOf(KylinConfig.getInstanceFromEnv().getScanThreshold())); + ds.setConnectionProperties(props); + ds.setDriverClassName(Driver.class.getName()); + ds.setUrl("jdbc:calcite:model=" + modelJson.getAbsolutePath()); + + ret = olapDataSources.putIfAbsent(project, ds); + if (ret == null) { + ret = ds; + } + } + return ret; + } public void rebuildCache(Broadcaster.TYPE cacheType, String cacheKey) { final String log = "rebuild cache type: " + cacheType + " name:" + cacheKey; logger.info(log); try { switch (cacheType) { - case CUBE: - CubeInstance newCube = getCubeManager().reloadCubeLocal(cacheKey); - getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey); - getProjectManager().clearL2Cache(); - //clean query related cache first - super.cleanDataCache(newCube.getUuid()); - cubeService.updateOnNewSegmentReady(cacheKey); - break; - case STREAMING: - getSreamingManager().reloadStreamingConfigLocal(cacheKey); - break; - case KAFKA: - getKafkaManager().reloadKafkaConfigLocal(cacheKey); - break; - case CUBE_DESC: - getCubeDescManager().reloadCubeDescLocal(cacheKey); - break; - case PROJECT: - ProjectInstance projectInstance = getProjectManager().reloadProjectLocal(cacheKey); + case CUBE: + CubeInstance newCube = getCubeManager().reloadCubeLocal(cacheKey); + getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey); + getProjectManager().clearL2Cache(); + //clean query related cache first + if (newCube != null) { + cleanDataCache(newCube.getUuid()); + } + cubeService.updateOnNewSegmentReady(cacheKey); + break; + case STREAMING: + getStreamingManager().reloadStreamingConfigLocal(cacheKey); + break; + case KAFKA: + getKafkaManager().reloadKafkaConfigLocal(cacheKey); + break; + case CUBE_DESC: + getCubeDescManager().reloadCubeDescLocal(cacheKey); + break; + case PROJECT: + ProjectInstance projectInstance = getProjectManager().reloadProjectLocal(cacheKey); + if (projectInstance != null) { removeOLAPDataSource(projectInstance.getName()); - break; - case INVERTED_INDEX: - //II update does not need to update storage cache because it is dynamic already - getIIManager().reloadIILocal(cacheKey); - getHybridManager().reloadHybridInstanceByChild(RealizationType.INVERTED_INDEX, cacheKey); - getProjectManager().clearL2Cache(); - break; - case INVERTED_INDEX_DESC: - getIIDescManager().reloadIIDescLocal(cacheKey); - break; - case TABLE: - getMetadataManager().reloadTableCache(cacheKey); - IIDescManager.clearCache(); - CubeDescManager.clearCache(); - break; - case DATA_MODEL: - getMetadataManager().reloadDataModelDesc(cacheKey); - IIDescManager.clearCache(); - CubeDescManager.clearCache(); - break; - case ALL: - MetadataManager.clearCache(); - CubeDescManager.clearCache(); - CubeManager.clearCache(); - IIDescManager.clearCache(); - IIManager.clearCache(); - HybridManager.clearCache(); - RealizationRegistry.clearCache(); - ProjectManager.clearCache(); - KafkaConfigManager.clearCache(); - StreamingManager.clearCache(); - super.cleanAllDataCache(); - BasicService.removeAllOLAPDataSources(); - break; - default: - throw new RuntimeException("invalid cacheType:" + cacheType); + } + break; + case INVERTED_INDEX: + //II update does not need to update storage cache because it is dynamic already + getIIManager().reloadIILocal(cacheKey); + getHybridManager().reloadHybridInstanceByChild(RealizationType.INVERTED_INDEX, cacheKey); + getProjectManager().clearL2Cache(); + break; + case INVERTED_INDEX_DESC: + getIIDescManager().reloadIIDescLocal(cacheKey); + break; + case TABLE: + getMetadataManager().reloadTableCache(cacheKey); + IIDescManager.clearCache(); + CubeDescManager.clearCache(); + break; + case DATA_MODEL: + getMetadataManager().reloadDataModelDesc(cacheKey); + IIDescManager.clearCache(); + CubeDescManager.clearCache(); + break; + case ALL: + MetadataManager.clearCache(); + CubeDescManager.clearCache(); + CubeManager.clearCache(); + IIDescManager.clearCache(); + IIManager.clearCache(); + HybridManager.clearCache(); + RealizationRegistry.clearCache(); + ProjectManager.clearCache(); + KafkaConfigManager.clearCache(); + StreamingManager.clearCache(); + + cleanAllDataCache(); + removeAllOLAPDataSources(); + break; + default: + throw new RuntimeException("invalid cacheType:" + cacheType); } } catch (IOException e) { throw new RuntimeException("error " + log, e); @@ -124,31 +221,31 @@ public class CacheService extends BasicService { final String log = "remove cache type: " + cacheType + " name:" + cacheKey; try { switch (cacheType) { - case CUBE: - if (getCubeManager().getCube(cacheKey) != null) { - String storageUUID = getCubeManager().getCube(cacheKey).getUuid(); - getCubeManager().removeCubeLocal(cacheKey); - super.cleanDataCache(storageUUID); + case CUBE: + CubeInstance cube = getCubeManager().getCube(cacheKey); + getCubeManager().removeCubeLocal(cacheKey); + if (cube != null) { + cleanDataCache(cube.getUuid()); } - break; - case CUBE_DESC: - getCubeDescManager().removeLocalCubeDesc(cacheKey); - break; - case PROJECT: - ProjectManager.clearCache(); - break; - case INVERTED_INDEX: - getIIManager().removeIILocal(cacheKey); - break; - case INVERTED_INDEX_DESC: - getIIDescManager().removeIIDescLocal(cacheKey); - break; - case TABLE: - throw new UnsupportedOperationException(log); - case DATA_MODEL: - throw new UnsupportedOperationException(log); - default: - throw new RuntimeException("invalid cacheType:" + cacheType); + break; + case CUBE_DESC: + getCubeDescManager().removeLocalCubeDesc(cacheKey); + break; + case PROJECT: + ProjectManager.clearCache(); + break; + case INVERTED_INDEX: + getIIManager().removeIILocal(cacheKey); + break; + case INVERTED_INDEX_DESC: + getIIDescManager().removeIIDescLocal(cacheKey); + break; + case TABLE: + throw new UnsupportedOperationException(log); + case DATA_MODEL: + throw new UnsupportedOperationException(log); + default: + throw new RuntimeException("invalid cacheType:" + cacheType); } } catch (IOException e) { throw new RuntimeException("error " + log, e); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java index 4e96360..a1e8a29 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -68,6 +68,7 @@ import org.apache.kylin.storage.hbase.HBaseConnection; import org.h2.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.security.access.AccessDeniedException; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.security.core.context.SecurityContextHolder; @@ -81,6 +82,9 @@ public class QueryService extends BasicService { private static final Logger logger = LoggerFactory.getLogger(QueryService.class); + @Autowired + private CacheService cacheService; + public static final String USER_QUERY_FAMILY = "q"; private static final String DEFAULT_TABLE_PREFIX = "kylin_metadata"; private static final String USER_TABLE_NAME = "_user"; @@ -282,7 +286,7 @@ public class QueryService extends BasicService { return Collections.emptyList(); } try { - DataSource dataSource = getOLAPDataSource(project); + DataSource dataSource = cacheService.getOLAPDataSource(project); conn = dataSource.getConnection(); DatabaseMetaData metaData = conn.getMetaData(); @@ -342,7 +346,7 @@ public class QueryService extends BasicService { List<SelectedColumnMeta> columnMetas = new LinkedList<SelectedColumnMeta>(); try { - conn = getOLAPDataSource(sqlRequest.getProject()).getConnection(); + conn = cacheService.getOLAPDataSource(sqlRequest.getProject()).getConnection(); if (sqlRequest instanceof PrepareSqlRequest) { PreparedStatement preparedState = conn.prepareStatement(sql); @@ -481,4 +485,28 @@ public class QueryService extends BasicService { return -1; } } + + private static void close(ResultSet resultSet, Statement stat, Connection conn) { + OLAPContext.clearParameter(); + + if (resultSet != null) + try { + resultSet.close(); + } catch (SQLException e) { + logger.error("failed to close", e); + } + if (stat != null) + try { + stat.close(); + } catch (SQLException e) { + logger.error("failed to close", e); + } + if (conn != null) + try { + conn.close(); + } catch (SQLException e) { + logger.error("failed to close", e); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java index 0f67ac8..e40426b 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java @@ -41,9 +41,9 @@ public class StreamingService extends BasicService { List<StreamingConfig> streamingConfigs = new ArrayList(); CubeInstance cubeInstance = (null != cubeName) ? getCubeManager().getCube(cubeName) : null; if (null == cubeInstance) { - streamingConfigs = getSreamingManager().listAllStreaming(); + streamingConfigs = getStreamingManager().listAllStreaming(); } else { - for(StreamingConfig config : getSreamingManager().listAllStreaming()){ + for(StreamingConfig config : getStreamingManager().listAllStreaming()){ if(cubeInstance.getName().equals(config.getCubeName())){ streamingConfigs.add(config); } @@ -70,21 +70,21 @@ public class StreamingService extends BasicService { } public StreamingConfig createStreamingConfig(StreamingConfig config) throws IOException { - if (getSreamingManager().getStreamingConfig(config.getName()) != null) { + if (getStreamingManager().getStreamingConfig(config.getName()) != null) { throw new InternalErrorException("The streamingConfig named " + config.getName() + " already exists"); } - StreamingConfig streamingConfig = getSreamingManager().saveStreamingConfig(config); + StreamingConfig streamingConfig = getStreamingManager().saveStreamingConfig(config); return streamingConfig; } // @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')") public StreamingConfig updateStreamingConfig(StreamingConfig config) throws IOException { - return getSreamingManager().updateStreamingConfig(config); + return getStreamingManager().updateStreamingConfig(config); } // @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')") public void dropStreamingConfig(StreamingConfig config) throws IOException { - getSreamingManager().removeStreamingConfig(config); + getStreamingManager().removeStreamingConfig(config); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java index 88d6273..af1bbc0 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java @@ -248,7 +248,7 @@ public class CacheServiceTest extends LocalFileMetadataTestCase { CubeUpdate cubeBuilder = new CubeUpdate(cube); cubeBuilder.setToAddSegs(segment); cube = cubeManager.updateCube(cubeBuilder); - //one for cube update, one for project update + //one for cube update assertEquals(1, broadcaster.getCounterAndClear()); waitForCounterAndClear(1); assertEquals(1, cubeManagerB.getCube(cubeName).getSegments().size()); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/test/java/org/apache/kylin/rest/service/CubeServiceTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/CubeServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CubeServiceTest.java index 7ab08f8..226380c 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/CubeServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/CubeServiceTest.java @@ -38,12 +38,15 @@ public class CubeServiceTest extends ServiceTestBase { @Autowired CubeService cubeService; + @Autowired + private CacheService cacheService; + @Test public void testBasics() throws JsonProcessingException, JobException, UnknownHostException { Assert.assertNotNull(cubeService.getConfig()); - Assert.assertNotNull(cubeService.getKylinConfig()); + Assert.assertNotNull(cubeService.getConfig()); Assert.assertNotNull(cubeService.getMetadataManager()); - Assert.assertNotNull(cubeService.getOLAPDataSource(ProjectInstance.DEFAULT_PROJECT_NAME)); + Assert.assertNotNull(cacheService.getOLAPDataSource(ProjectInstance.DEFAULT_PROJECT_NAME)); Assert.assertTrue(CubeService.getCubeDescNameFromCube("testCube").equals("testCube_desc")); Assert.assertTrue(CubeService.getCubeNameFromDesc("testCube_desc").equals("testCube")); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java index a53dd3c..41057e5 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java @@ -34,12 +34,15 @@ public class JobServiceTest extends ServiceTestBase { @Autowired JobService jobService; + @Autowired + private CacheService cacheService; + @Test public void testBasics() throws JobException, IOException { Assert.assertNotNull(jobService.getConfig()); - Assert.assertNotNull(jobService.getKylinConfig()); + Assert.assertNotNull(jobService.getConfig()); Assert.assertNotNull(jobService.getMetadataManager()); - Assert.assertNotNull(jobService.getOLAPDataSource(ProjectInstance.DEFAULT_PROJECT_NAME)); + Assert.assertNotNull(cacheService.getOLAPDataSource(ProjectInstance.DEFAULT_PROJECT_NAME)); Assert.assertNull(jobService.getJobInstance("job_not_exist")); Assert.assertNotNull(jobService.listAllJobs(null, null, null)); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java index 44e5ef6..cc86e1e 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java @@ -37,12 +37,15 @@ public class QueryServiceTest extends ServiceTestBase { @Autowired QueryService queryService; + @Autowired + private CacheService cacheService; + @Test public void testBasics() throws JobException, IOException, SQLException { Assert.assertNotNull(queryService.getConfig()); - Assert.assertNotNull(queryService.getKylinConfig()); + Assert.assertNotNull(queryService.getConfig()); Assert.assertNotNull(queryService.getMetadataManager()); - Assert.assertNotNull(queryService.getOLAPDataSource(ProjectInstance.DEFAULT_PROJECT_NAME)); + Assert.assertNotNull(cacheService.getOLAPDataSource(ProjectInstance.DEFAULT_PROJECT_NAME)); // Assert.assertTrue(queryService.getQueries("ADMIN").size() == 0); // http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java index fd613f4..f8dc945 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java +++ b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java @@ -69,8 +69,7 @@ public class ServiceTestBase extends LocalFileMetadataTestCase { IIManager.clearCache(); RealizationRegistry.clearCache(); ProjectManager.clearCache(); - BasicService.removeAllOLAPDataSources(); - + CacheService.removeAllOLAPDataSources(); } @After