KYLIN-2834 fix bug in Broadcaster, lost listener after cache wipe

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ecf4819e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ecf4819e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ecf4819e

Branch: refs/heads/master
Commit: ecf4819eb39b8d22ea82c70d6679200c0b2602bb
Parents: 26c03fe
Author: Li Yang <liy...@apache.org>
Authored: Sat Sep 2 06:49:34 2017 +0800
Committer: Hongbin Ma <m...@kyligence.io>
Committed: Tue Sep 5 16:58:30 2017 +0800

----------------------------------------------------------------------
 .../kylin/metadata/cachesync/Broadcaster.java   | 49 +++++++++-----
 .../metadata/cachesync/BroadcasterTest.java     | 70 ++++++++++++++++++++
 .../kylin/rest/controller/CubeController.java   |  2 +-
 .../rest/controller2/CubeControllerV2.java      |  2 +-
 .../apache/kylin/rest/service/CacheService.java | 12 ++--
 .../apache/kylin/rest/service/CubeService.java  | 27 +++-----
 6 files changed, 121 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/ecf4819e/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
index 4b0ef57..26e6f49 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -103,8 +103,9 @@ public class Broadcaster {
 
     // 
============================================================================
 
-    private KylinConfig config;
+    static final Map<String, List<Listener>> staticListenerMap = 
Maps.newConcurrentMap();
 
+    private KylinConfig config;
     private BlockingDeque<BroadcastEvent> broadcastEvents = new 
LinkedBlockingDeque<>();
     private Map<String, List<Listener>> listenerMap = Maps.newConcurrentMap();
     private AtomicLong counter = new AtomicLong();
@@ -158,31 +159,40 @@ public class Broadcaster {
         });
     }
 
+    // static listener survives cache wipe and goes after normal listeners
+    public void registerStaticListener(Listener listener, String... entities) {
+        doRegisterListener(staticListenerMap, listener, entities);
+    }
+    
     public void registerListener(Listener listener, String... entities) {
-        synchronized (listenerMap) {
+        doRegisterListener(listenerMap, listener, entities);
+    }
+    
+    private static void doRegisterListener(Map<String, List<Listener>> lmap, 
Listener listener, String... entities) {
+        synchronized (lmap) {
             // ignore re-registration
-            List<Listener> all = listenerMap.get(SYNC_ALL);
+            List<Listener> all = lmap.get(SYNC_ALL);
             if (all != null && all.contains(listener)) {
                 return;
             }
 
             for (String entity : entities) {
                 if (!StringUtils.isBlank(entity))
-                    addListener(entity, listener);
+                    addListener(lmap, entity, listener);
             }
-            addListener(SYNC_ALL, listener);
-            addListener(SYNC_PRJ_SCHEMA, listener);
-            addListener(SYNC_PRJ_DATA, listener);
+            addListener(lmap, SYNC_ALL, listener);
+            addListener(lmap, SYNC_PRJ_SCHEMA, listener);
+            addListener(lmap, SYNC_PRJ_DATA, listener);
         }
     }
 
-    private void addListener(String entity, Listener listener) {
-        List<Listener> list = listenerMap.get(entity);
+    private static void addListener(Map<String, List<Listener>> lmap, String 
entity, Listener listener) {
+        List<Listener> list = lmap.get(entity);
         if (list == null) {
             list = new ArrayList<>();
+            lmap.put(entity, list);
         }
         list.add(listener);
-        listenerMap.put(entity, list);
     }
 
     public void notifyClearAll() throws IOException {
@@ -198,15 +208,19 @@ public class Broadcaster {
     }
 
     public void notifyListener(String entity, Event event, String cacheKey) 
throws IOException {
-        List<Listener> list = listenerMap.get(entity);
-        if (list == null)
+        // prevents concurrent modification exception
+        List<Listener> list = Lists.newArrayList();
+        List<Listener> l1 = listenerMap.get(entity); // normal listeners first
+        if (l1 != null)
+            list.addAll(l1);
+        List<Listener> l2 = staticListenerMap.get(entity); // static listeners 
second
+        if (l2 != null)
+            list.addAll(l2);
+        if (list.isEmpty())
             return;
 
-        logger.trace("Broadcasting metadata change: entity=" + entity + ", 
event=" + event + ", cacheKey=" + cacheKey
-                + ", listeners=" + list);
+        logger.debug("Broadcasting" + event + ", " + entity + ", " + cacheKey);
 
-        // prevents concurrent modification exception
-        list = Lists.newArrayList(list);
         switch (entity) {
         case SYNC_ALL:
             for (Listener l : list) {
@@ -233,8 +247,7 @@ public class Broadcaster {
             break;
         }
 
-        logger.debug(
-                "Done broadcasting metadata change: entity=" + entity + ", 
event=" + event + ", cacheKey=" + cacheKey);
+        logger.debug("Done broadcasting" + event + ", " + entity + ", " + 
cacheKey);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kylin/blob/ecf4819e/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java
 
b/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java
new file mode 100644
index 0000000..88cf404
--- /dev/null
+++ 
b/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.cachesync;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Listener;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BroadcasterTest extends LocalFileMetadataTestCase {
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+    
+    @Test
+    public void testBasics() throws IOException {
+        Broadcaster broadcaster = Broadcaster.getInstance(getTestConfig());
+        final AtomicInteger i = new AtomicInteger(0);
+
+        broadcaster.registerStaticListener(new Listener() {
+            @Override
+            public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey)
+                    throws IOException {
+                Assert.assertEquals(2, i.incrementAndGet());
+            }
+        }, "test");
+        
+        broadcaster.registerListener(new Listener() {
+            @Override
+            public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey)
+                    throws IOException {
+                Assert.assertEquals(1, i.incrementAndGet());
+            }
+        }, "test");
+        
+        broadcaster.notifyListener("test", Event.UPDATE, "");
+        
+        Broadcaster.staticListenerMap.clear();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ecf4819e/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 14014fc..20cab7e 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -571,7 +571,7 @@ public class CubeController extends BasicController {
 
             // Get info of given table.
             try {
-                hr = cubeService.getHTableInfo(tableName);
+                hr = cubeService.getHTableInfo(cubeName, tableName);
             } catch (IOException e) {
                 logger.error("Failed to calcuate size of HTable \"" + 
tableName + "\".", e);
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ecf4819e/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java
index 292b633..e8337ab 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java
@@ -460,7 +460,7 @@ public class CubeControllerV2 extends BasicController {
 
             // Get info of given table.
             try {
-                hr = cubeService.getHTableInfo(tableName);
+                hr = cubeService.getHTableInfo(cubeName, tableName);
             } catch (IOException e) {
                 logger.error("Failed to calculate size of HTable \"" + 
tableName + "\".", e);
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ecf4819e/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 7758987..536b338 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -29,6 +29,7 @@ import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.query.QueryDataSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Component;
@@ -38,7 +39,7 @@ import net.sf.ehcache.CacheManager;
 /**
  */
 @Component("cacheService")
-public class CacheService extends BasicService {
+public class CacheService extends BasicService implements InitializingBean {
 
     private static final Logger logger = 
LoggerFactory.getLogger(CacheService.class);
     private static QueryDataSource queryDataSource = new QueryDataSource();
@@ -92,6 +93,11 @@ public class CacheService extends BasicService {
         this.cubeService = cubeService;
     }
 
+    @Override
+    public void afterPropertiesSet() throws Exception {
+        
Broadcaster.getInstance(getConfig()).registerStaticListener(cacheSyncListener, 
"cube");
+    }
+
     public void wipeProjectCache(String project) {
         if (project == null)
             annouceWipeCache("all", "update", "all");
@@ -106,10 +112,6 @@ public class CacheService extends BasicService {
 
     public void notifyMetadataChange(String entity, Event event, String 
cacheKey) throws IOException {
         Broadcaster broadcaster = Broadcaster.getInstance(getConfig());
-
-        // broadcaster can be clearCache() too, make sure listener is 
registered; re-registration will be ignored
-        broadcaster.registerListener(cacheSyncListener, "cube");
-
         broadcaster.notifyListener(entity, event, cacheKey);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ecf4819e/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index e79bab9..0fcee44 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -52,7 +52,6 @@ import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.exception.ForbiddenException;
-import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.msg.Message;
 import org.apache.kylin.rest.msg.MsgPicker;
 import org.apache.kylin.rest.request.MetricsRequest;
@@ -417,8 +416,9 @@ public class CubeService extends BasicService implements 
InitializingBean {
      * if error happens
      * @throws IOException Exception when HTable resource is not closed 
correctly.
      */
-    public HBaseResponse getHTableInfo(String tableName) throws IOException {
-        HBaseResponse hr = htableInfoCache.getIfPresent(tableName);
+    public HBaseResponse getHTableInfo(String cubeName, String tableName) 
throws IOException {
+        String key = cubeName + "/" + tableName;
+        HBaseResponse hr = htableInfoCache.getIfPresent(key);
         if (null != hr) {
             return hr;
         }
@@ -426,6 +426,8 @@ public class CubeService extends BasicService implements 
InitializingBean {
         hr = new HBaseResponse();
         if ("hbase".equals(getConfig().getMetadataUrl().getScheme())) {
             try {
+                logger.debug("Loading HTable info " + cubeName + ", " + 
tableName);
+                
                 // use reflection to isolate NoClassDef errors when HBase is 
not available
                 hr = (HBaseResponse) 
Class.forName("org.apache.kylin.rest.service.HBaseInfoUtil")//
                         .getMethod("getHBaseInfo", new Class[] { String.class, 
KylinConfig.class })//
@@ -435,7 +437,7 @@ public class CubeService extends BasicService implements 
InitializingBean {
             }
         }
 
-        htableInfoCache.put(tableName, hr);
+        htableInfoCache.put(key, hr);
         return hr;
     }
 
@@ -716,8 +718,7 @@ public class CubeService extends BasicService implements 
InitializingBean {
 
     @Override
     public void afterPropertiesSet() throws Exception {
-        Broadcaster.getInstance(getConfig()).registerListener(new 
HTableInfoSyncListener(), "cube");
-        logger.info("HTableInfoSyncListener is on.");
+        Broadcaster.getInstance(getConfig()).registerStaticListener(new 
HTableInfoSyncListener(), "cube");
     }
 
     private class HTableInfoSyncListener extends Broadcaster.Listener {
@@ -730,17 +731,11 @@ public class CubeService extends BasicService implements 
InitializingBean {
         public void onEntityChange(Broadcaster broadcaster, String entity, 
Broadcaster.Event event, String cacheKey)
                 throws IOException {
             String cubeName = cacheKey;
-
-            CubeInstance cube = getCubeManager().getCube(cubeName);
-            if (null == cube) {
-                throw new InternalErrorException("Cannot find cube " + 
cubeName);
-            }
-
-            List<String> htableNameList = 
Lists.newArrayListWithExpectedSize(cube.getSegments().size());
-            for (CubeSegment segment : cube.getSegments()) {
-                htableNameList.add(segment.getStorageLocationIdentifier());
+            String keyPrefix = cubeName + "/";
+            for (String k : htableInfoCache.asMap().keySet()) {
+                if (k.startsWith(keyPrefix))
+                    htableInfoCache.invalidate(k);
             }
-            htableInfoCache.invalidateAll(htableNameList);
         }
     }
 }

Reply via email to