KYLIN-2672 Only clean necessary cache for CubeMigrationCLI

Signed-off-by: Li Yang <liy...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a88403ae
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a88403ae
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a88403ae

Branch: refs/heads/2.3.x
Commit: a88403ae0050f6b1b2f903534f2330f03996603c
Parents: b4b25ec
Author: kangkaisen <kangkai...@meituan.com>
Authored: Mon Dec 25 18:33:31 2017 +0800
Committer: Billy Liu <billy...@apache.org>
Committed: Fri Feb 9 21:25:19 2018 +0800

----------------------------------------------------------------------
 .../kylin/common/restclient/RestClient.java     | 20 +++++++
 .../java/org/apache/kylin/cube/CubeManager.java |  2 +-
 .../kylin/metadata/TableMetadataManager.java    | 12 ++++
 .../apache/kylin/metadata/model/TableDesc.java  |  2 +-
 .../kylin/metadata/project/ProjectL2Cache.java  |  4 ++
 .../kylin/metadata/project/ProjectManager.java  |  6 +-
 .../kylin/rest/controller/CacheController.java  | 10 ++++
 .../rest/request/CubeMigrationRequest.java      | 62 ++++++++++++++++++++
 .../apache/kylin/rest/service/CacheService.java | 34 ++++++++++-
 .../org/apache/kylin/tool/CubeMigrationCLI.java | 28 ++++++---
 10 files changed, 168 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/a88403ae/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java 
b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index 02045ae..e1cc13c 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -274,6 +274,26 @@ public class RestClient {
         return response;
     }
 
+    public void clearCacheForCubeMigration(String cube, String project, String 
model, Map<String, String> tableToProjects) throws IOException{
+        String url = baseUrl + "/cache/migration";
+        HttpPost post = new HttpPost(url);
+
+        post.addHeader("Accept", "application/json, text/plain, */*");
+        post.addHeader("Content-Type", "application/json");
+
+        HashMap<String, Object> paraMap = new HashMap<String, Object>();
+        paraMap.put("cube", cube);
+        paraMap.put("project", project);
+        paraMap.put("model", model);
+        paraMap.put("tableToProjects", tableToProjects);
+        String jsonMsg = JsonUtil.writeValueAsString(paraMap);
+        post.setEntity(new StringEntity(jsonMsg, "UTF-8"));
+        HttpResponse response = client.execute(post);
+        if (response.getStatusLine().getStatusCode() != 200) {
+            throw new IOException("Invalid response " + 
response.getStatusLine().getStatusCode());
+        }
+    }
+
     private HashMap dealResponse(HttpResponse response) throws IOException {
         if (response.getStatusLine().getStatusCode() != 200) {
             throw new IOException("Invalid response " + 
response.getStatusLine().getStatusCode());

http://git-wip-us.apache.org/repos/asf/kylin/blob/a88403ae/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 8bdb5aa..9c52e8b 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -386,7 +386,7 @@ public class CubeManager implements IRealizationProvider {
     }
 
     // for internal
-    CubeInstance reloadCubeQuietly(String cubeName) {
+    public CubeInstance reloadCubeQuietly(String cubeName) {
         try (AutoLock lock = cubeMapLock.lockForWrite()) {
             CubeInstance cube = crud.reloadQuietly(cubeName);
             if (cube != null)

http://git-wip-us.apache.org/repos/asf/kylin/blob/a88403ae/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
index f09c47c..2308df4 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
@@ -151,6 +151,12 @@ public class TableMetadataManager {
         }
     }
 
+    public void reloadSourceTable(String table, String project) {
+        try (AutoLock lock = srcTableMapLock.lockForWrite()) {
+            srcTableCrud.reloadQuietly(TableDesc.concatResourcePath(table, 
project));
+        }
+    }
+
     public List<TableDesc> listAllTables(String prj) {
         try (AutoLock lock = srcTableMapLock.lockForWrite()) {
             return Lists.newArrayList(getAllTablesMap(prj).values());
@@ -314,6 +320,12 @@ public class TableMetadataManager {
         }
     }
 
+    public void reloadTableExt(String table, String project) {
+        try (AutoLock lock = srcExtMapLock.lockForWrite()) {
+            srcExtCrud.reloadQuietly(TableExtDesc.concatResourcePath(table, 
project));
+        }
+    }
+
     /**
      * Get table extended info. Keys are defined in {@link MetadataConstants}
      * 

http://git-wip-us.apache.org/repos/asf/kylin/blob/a88403ae/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index 68bc5e9..be278de 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -57,7 +57,7 @@ public class TableDesc extends RootPersistentEntity 
implements ISourceAware {
 
     // this method should only used for getting dest path when copying from 
src to dest.
     // if you want to get table's src path, use getResourcePath() instead.
-    private static String concatResourcePath(String tableIdentity, String prj) 
{
+    public static String concatResourcePath(String tableIdentity, String prj) {
         return concatRawResourcePath(makeResourceName(tableIdentity, prj));
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/a88403ae/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectL2Cache.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectL2Cache.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectL2Cache.java
index 70b6a12..6e09ae8 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectL2Cache.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectL2Cache.java
@@ -187,6 +187,10 @@ class ProjectL2Cache {
         return result;
     }
 
+    public void reloadCacheByProject(String project) {
+        projectCaches.put(project, loadCache(project));
+    }
+
     private ProjectCache loadCache(String project) {
         logger.debug("Loading L2 project cache for " + project);
         ProjectCache projectCache = new ProjectCache(project);

http://git-wip-us.apache.org/repos/asf/kylin/blob/a88403ae/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index 0dd364d..aae692d 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -117,10 +117,14 @@ public class ProjectManager {
         l2Cache.clear();
     }
 
+    public void reloadProjectL2Cache(String project) {
+        l2Cache.reloadCacheByProject(project);
+    }
+
     public ProjectInstance reloadProjectQuietly(String project) throws 
IOException {
         try (AutoLock lock = prjMapLock.lockForWrite()) {
             ProjectInstance prj = crud.reloadQuietly(project);
-            clearL2Cache();
+            reloadProjectL2Cache(project);
             return prj;
         }
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/a88403ae/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
index 992094b..08b7cc4 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.rest.request.CubeMigrationRequest;
 import org.apache.kylin.rest.service.CacheService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,6 +30,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.ResponseBody;
@@ -73,6 +75,14 @@ public class CacheController extends BasicController {
         cacheService.notifyMetadataChange(Broadcaster.SYNC_ALL, 
Broadcaster.Event.UPDATE, Broadcaster.SYNC_ALL);
     }
 
+    @RequestMapping(value = "/migration", method = RequestMethod.POST)
+    @ResponseBody
+    public void clearCacheForCubeMigration(@RequestBody CubeMigrationRequest 
request) throws IOException {
+        cacheService.clearCacheForCubeMigration(request.getCube(), 
request.getProject(), request.getModel(), request.getTableToProjects());
+
+        cacheService.cleanDataCache(request.getProject());
+    }
+
     public void setCacheService(CacheService cacheService) {
         this.cacheService = cacheService;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/a88403ae/server-base/src/main/java/org/apache/kylin/rest/request/CubeMigrationRequest.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/request/CubeMigrationRequest.java
 
b/server-base/src/main/java/org/apache/kylin/rest/request/CubeMigrationRequest.java
new file mode 100644
index 0000000..175fb59
--- /dev/null
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/request/CubeMigrationRequest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.request;
+
+import java.util.Map;
+
+public class CubeMigrationRequest {
+    private String cube;
+    private String model;
+    private String project;
+
+    private Map<String, String> tableToProjects;//For KYLIN-2717 
compatibility, the project of old table will be NULL
+
+    public String getProject() {
+        return project;
+    }
+
+    public void setProject(String project) {
+        this.project = project;
+    }
+
+    public String getCube() {
+        return cube;
+    }
+
+    public void setCube(String cube) {
+        this.cube = cube;
+    }
+
+    public String getModel() {
+        return model;
+    }
+
+    public void setModel(String model) {
+        this.model = model;
+    }
+
+    public Map<String, String> getTableToProjects() {
+        return tableToProjects;
+    }
+
+    public void setTableToProjects(Map<String, String> tableToProjects) {
+        this.tableToProjects = tableToProjects;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/a88403ae/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
index b61309e..a8771ed 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -20,6 +20,7 @@ package org.apache.kylin.rest.service;
 
 import java.io.IOException;
 
+import java.util.Map;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
 import org.apache.kylin.storage.hbase.HBaseConnection;
@@ -114,7 +115,7 @@ public class CacheService extends BasicService implements 
InitializingBean {
         broadcaster.notifyListener(entity, event, cacheKey);
     }
 
-    protected void cleanDataCache(String project) {
+    public void cleanDataCache(String project) {
         if (cacheManager != null) {
             logger.info("cleaning cache for project " + project + " (currently 
remove all entries)");
             
cacheManager.getCache(QueryService.SUCCESS_QUERY_CACHE).removeAll();
@@ -133,4 +134,35 @@ public class CacheService extends BasicService implements 
InitializingBean {
         }
     }
 
+    public void clearCacheForCubeMigration(String cube, String project, String 
model, Map<String, String> tableToProjects) throws IOException {
+        //the metadata reloading must be in order
+
+        //table must before model
+        for (Map.Entry<String, String> entry : tableToProjects.entrySet()) {
+            //For KYLIN-2717 compatibility, use tableProject not project
+            getTableManager().reloadSourceTable(entry.getKey(), 
entry.getValue());
+            getTableManager().reloadTableExt(entry.getKey(), entry.getValue());
+        }
+        logger.info("reload table cache done");
+
+        //ProjectInstance cache must before cube and model cache, as the new 
cubeDesc init and model reloading relays on latest ProjectInstance cache
+        getProjectManager().reloadProjectQuietly(project);
+        logger.info("reload project cache done");
+
+        //model must before cube desc
+        getDataModelManager().reloadDataModel(model);
+        logger.info("reload model cache done");
+
+        //cube desc must before cube instance
+        getCubeDescManager().reloadCubeDescLocal(cube);
+        logger.info("reload cubeDesc cache done");
+
+        getCubeManager().reloadCubeQuietly(cube);
+        logger.info("reload cube cache done");
+
+        //reload project l2cache again after cube cache, because the project 
L2 cache relay on latest cube cache
+        getProjectManager().reloadProjectL2Cache(project);
+        logger.info("reload project l2cache done");
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/a88403ae/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java 
b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
index a4a6ab9..5426b62 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
@@ -54,7 +54,6 @@ import org.apache.kylin.dict.lookup.SnapshotManager;
 import org.apache.kylin.dict.lookup.SnapshotTable;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
@@ -129,8 +128,16 @@ public class CubeMigrationCLI extends AbstractApplication {
             throws IOException, InterruptedException {
 
         moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), 
KylinConfig.createInstanceFromUri(dstCfgUri), cubeName,
-                projectName, Boolean.parseBoolean(copyAcl), 
Boolean.parseBoolean(purgeAndDisable),
-                Boolean.parseBoolean(overwriteIfExists), 
Boolean.parseBoolean(realExecute), true);
+                projectName, copyAcl, purgeAndDisable, overwriteIfExists, 
realExecute);
+    }
+
+    public void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String 
cubeName, String projectName, String copyAcl,
+            String purgeAndDisable, String overwriteIfExists, String 
realExecute)
+            throws IOException, InterruptedException {
+
+        moveCube(srcCfg, dstCfg, cubeName, projectName, 
Boolean.parseBoolean(copyAcl),
+                Boolean.parseBoolean(purgeAndDisable), 
Boolean.parseBoolean(overwriteIfExists),
+                Boolean.parseBoolean(realExecute), true);
     }
 
     public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, 
String projectName, String copyAcl,
@@ -187,12 +194,12 @@ public class CubeMigrationCLI extends AbstractApplication 
{
             if (migrateSegment) {
                 checkMigrationSuccess(dstConfig, cubeName, true);
             }
-            updateMeta(dstConfig);
+            updateMeta(dstConfig, projectName, cubeName, cube.getModel());
         } else {
             showOpts();
         }
     }
-
+    
     public void checkMigrationSuccess(KylinConfig kylinConfig, String 
cubeName, Boolean ifFix) throws IOException {
         CubeMigrationCheckCLI checkCLI = new 
CubeMigrationCheckCLI(kylinConfig, ifFix);
         checkCLI.execute(cubeName);
@@ -619,7 +626,7 @@ public class CubeMigrationCLI extends AbstractApplication {
         }
         }
     }
-
+    
     private String renameTableWithinProject(String srcItem) {
         if (dstProject != null && 
srcItem.contains(ResourceStore.TABLE_RESOURCE_ROOT)) {
             String tableIdentity = 
TableDesc.parseResourcePath(srcItem).getFirst();
@@ -631,13 +638,18 @@ public class CubeMigrationCLI extends AbstractApplication 
{
         return srcItem;
     }
 
-    private void updateMeta(KylinConfig config) {
+    private void updateMeta(KylinConfig config, String projectName, String 
cubeName, DataModelDesc model) {
         String[] nodes = config.getRestServers();
+        Map<String, String> tableToProjects = new HashMap<>();
+        for (TableRef tableRef : model.getAllTables()) {
+            tableToProjects.put(tableRef.getTableIdentity(), 
tableRef.getTableDesc().getProject());
+        }
+
         for (String node : nodes) {
             RestClient restClient = new RestClient(node);
             try {
                 logger.info("update meta cache for " + node);
-                restClient.wipeCache(Broadcaster.SYNC_ALL, 
Broadcaster.Event.UPDATE.getType(), Broadcaster.SYNC_ALL);
+                restClient.clearCacheForCubeMigration(cubeName, projectName, 
model.getName(), tableToProjects);
             } catch (IOException e) {
                 logger.error(e.getMessage());
             }

Reply via email to