KYLIN-1127 Adopt listener pattern to wipe query cache on cube update
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/a3397d04 Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/a3397d04 Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/a3397d04 Branch: refs/heads/2.x-staging Commit: a3397d044d6cd333d8f9d94bfd75aa603180d67e Parents: 2f4595a Author: Yang Li <liy...@apache.org> Authored: Mon Nov 9 11:53:43 2015 +0800 Committer: Yang Li <liy...@apache.org> Committed: Mon Nov 9 11:53:43 2015 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/cube/CubeManager.java | 34 ++++++++-- .../kylin/storage/hybrid/HybridManager.java | 4 +- .../apache/kylin/rest/service/CacheService.java | 68 +++++++++++++++----- .../kylin/rest/service/CacheServiceTest.java | 2 + 4 files changed, 85 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a3397d04/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index a249cd7..89873d2 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -236,6 +236,9 @@ public class CubeManager implements IRealizationProvider { // delete cube from project ProjectManager.getInstance(config).removeRealizationsFromProjects(RealizationType.CUBE, cubeName); + + if (listener != null) + listener.afterCubeDelete(cube); return cube; } @@ -248,14 +251,22 @@ public class CubeManager implements IRealizationProvider { CubeInstance cube = CubeInstance.create(cubeName, projectName, desc); cube.setOwner(owner); - updateCube(new CubeUpdate(cube)); + updateCubeWithRetry(new CubeUpdate(cube), 0); ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cubeName, projectName, owner); + if (listener != null) + listener.afterCubeCreate(cube); + return cube; } public CubeInstance updateCube(CubeUpdate update) throws IOException { - return updateCube(update, 0); + CubeInstance cube = updateCubeWithRetry(update, 0); + + if (listener != null) + listener.afterCubeUpdate(cube); + + return cube; } private boolean validateReadySegments(CubeInstance cube) { @@ -283,7 +294,7 @@ public class CubeManager implements IRealizationProvider { return true; } - private CubeInstance updateCube(CubeUpdate update, int retry) throws IOException { + private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IOException { if (update == null || update.getCubeInstance() == null) throw new IllegalStateException(); @@ -348,7 +359,7 @@ public class CubeManager implements IRealizationProvider { cube = reloadCubeLocal(cube.getName()); update.setCubeInstance(cube); retry++; - cube = updateCube(update, retry); + cube = updateCubeWithRetry(update, retry); } if (toRemoveResources.size() > 0) { @@ -850,4 +861,19 @@ public class CubeManager implements IRealizationProvider { return getCube(name); } + // ============================================================================ + + public interface CubeChangeListener { + void afterCubeCreate(CubeInstance cube); + + void afterCubeUpdate(CubeInstance cube); + + void afterCubeDelete(CubeInstance cube); + } + + private CubeChangeListener listener; + + public void setCubeChangeListener(CubeChangeListener listener) { + this.listener = listener; + } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a3397d04/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java index 9392ef5..5f16b6b 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java @@ -98,7 +98,7 @@ public class HybridManager implements IRealizationProvider { logger.debug("Loaded " + paths.size() + " Hybrid(s)"); } - public void reloadHybridInstanceByChild(RealizationType type, String realizationName) throws IOException { + public void reloadHybridInstanceByChild(RealizationType type, String realizationName) { for (HybridInstance hybridInstance : hybridMap.values()) { boolean includes = false; for (IRealization realization : hybridInstance.getRealizations()) { @@ -113,7 +113,7 @@ public class HybridManager implements IRealizationProvider { } } - private synchronized HybridInstance loadHybridInstance(String path) throws IOException { + private synchronized HybridInstance loadHybridInstance(String path) { ResourceStore store = getStore(); HybridInstance hybridInstance = null; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a3397d04/server/src/main/java/org/apache/kylin/rest/service/CacheService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java index f9c3ec1..8371907 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java @@ -26,6 +26,7 @@ import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import javax.annotation.PostConstruct; import javax.sql.DataSource; import net.sf.ehcache.CacheManager; @@ -72,6 +73,28 @@ public class CacheService extends BasicService { @Autowired private CacheManager cacheManager; + @PostConstruct + public void initCubeChangeListener() throws IOException { + CubeManager cubeMgr = CubeManager.getInstance(getConfig()); + cubeMgr.setCubeChangeListener(new CubeManager.CubeChangeListener() { + + @Override + public void afterCubeCreate(CubeInstance cube) { + // no cache need change + } + + @Override + public void afterCubeUpdate(CubeInstance cube) { + rebuildCubeCache(cube.getName()); + } + + @Override + public void afterCubeDelete(CubeInstance cube) { + removeCubeCache(cube.getName(), cube); + } + }); + } + // for test public void setCubeService(CubeService cubeService) { this.cubeService = cubeService; @@ -151,14 +174,7 @@ public class CacheService extends BasicService { try { switch (cacheType) { case CUBE: - CubeInstance newCube = getCubeManager().reloadCubeLocal(cacheKey); - getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey); - getProjectManager().clearL2Cache(); - //clean query related cache first - if (newCube != null) { - cleanDataCache(newCube.getUuid()); - } - cubeService.updateOnNewSegmentReady(cacheKey); + rebuildCubeCache(cacheKey); break; case STREAMING: getStreamingManager().reloadStreamingConfigLocal(cacheKey); @@ -170,10 +186,8 @@ public class CacheService extends BasicService { getCubeDescManager().reloadCubeDescLocal(cacheKey); break; case PROJECT: - ProjectInstance projectInstance = getProjectManager().reloadProjectLocal(cacheKey); - if (projectInstance != null) { - removeOLAPDataSource(projectInstance.getName()); - } + getProjectManager().reloadProjectLocal(cacheKey); + removeOLAPDataSource(cacheKey); break; case INVERTED_INDEX: //II update does not need to update storage cache because it is dynamic already @@ -217,16 +231,23 @@ public class CacheService extends BasicService { } } + private void rebuildCubeCache(String cubeName) { + CubeInstance cube = getCubeManager().reloadCubeLocal(cubeName); + getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cubeName); + getProjectManager().clearL2Cache(); + //clean query related cache first + if (cube != null) { + cleanDataCache(cube.getUuid()); + } + cubeService.updateOnNewSegmentReady(cubeName); + } + public void removeCache(Broadcaster.TYPE cacheType, String cacheKey) { final String log = "remove cache type: " + cacheType + " name:" + cacheKey; try { switch (cacheType) { case CUBE: - CubeInstance cube = getCubeManager().getCube(cacheKey); - getCubeManager().removeCubeLocal(cacheKey); - if (cube != null) { - cleanDataCache(cube.getUuid()); - } + removeCubeCache(cacheKey, null); break; case CUBE_DESC: getCubeDescManager().removeLocalCubeDesc(cacheKey); @@ -251,4 +272,17 @@ public class CacheService extends BasicService { throw new RuntimeException("error " + log, e); } } + + private void removeCubeCache(String cubeName, CubeInstance cube) { + // you may not get the cube instance if it's already removed from metadata + if (cube == null) { + cube = getCubeManager().getCube(cubeName); + } + + getCubeManager().removeCubeLocal(cubeName); + + if (cube != null) { + cleanDataCache(cube.getUuid()); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a3397d04/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java index b90c10a..25b131a 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java @@ -108,7 +108,9 @@ public class CacheServiceTest extends LocalFileMetadataTestCase { }; serviceA.setCubeService(cubeServiceA); + serviceA.initCubeChangeListener(); serviceB.setCubeService(cubeServiceB); + serviceB.initCubeChangeListener(); context.addServlet(new ServletHolder(new BroadcasterReceiveServlet(new BroadcasterReceiveServlet.BroadcasterHandler() { @Override