KYLIN-1127 Broadcaster be configured by KylinConfig (instead of spring profile)
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/2f4595ae Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/2f4595ae Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/2f4595ae Branch: refs/heads/2.x-staging Commit: 2f4595ae9db582a5b1c498534916f90db22b55b2 Parents: f1dedab Author: Yang Li <liy...@apache.org> Authored: Mon Nov 9 11:15:12 2015 +0800 Committer: Yang Li <liy...@apache.org> Committed: Mon Nov 9 11:15:12 2015 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/cache/CacheUpdater.java | 11 ---- .../kylin/common/cache/LocalCacheUpdater.java | 18 ------ .../kylin/common/cache/RemoteCacheUpdater.java | 14 ----- .../common/restclient/AbstractRestCache.java | 17 +++--- .../kylin/common/restclient/Broadcaster.java | 58 +++++++++++++++----- .../restclient/CaseInsensitiveStringCache.java | 6 +- .../common/restclient/SingleValueCache.java | 16 +++--- .../org/apache/kylin/cube/CubeDescManager.java | 3 +- .../java/org/apache/kylin/cube/CubeManager.java | 4 +- .../apache/kylin/metadata/MetadataManager.java | 10 +++- .../kylin/metadata/project/ProjectManager.java | 3 +- .../kylin/storage/hybrid/HybridManager.java | 4 +- .../engine/streaming/StreamingManager.java | 3 +- .../engine/streaming/cli/StreamingCLI.java | 7 +-- .../test_case_data/localmeta/kylin.properties | 2 +- .../kylin/invertedindex/IIDescManager.java | 3 +- .../apache/kylin/invertedindex/IIManager.java | 4 +- .../kylin/rest/service/CacheServiceTest.java | 6 +- .../kylin/source/kafka/KafkaConfigManager.java | 23 ++++---- 19 files changed, 105 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java b/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java deleted file mode 100644 index 615ee14..0000000 --- a/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java +++ /dev/null @@ -1,11 +0,0 @@ -package org.apache.kylin.common.cache; - -import org.apache.kylin.common.restclient.AbstractRestCache; -import org.apache.kylin.common.restclient.Broadcaster; - -/** - */ -@SuppressWarnings("rawtypes") -public interface CacheUpdater { - void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache); -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java b/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java deleted file mode 100644 index 8d3b648..0000000 --- a/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java +++ /dev/null @@ -1,18 +0,0 @@ -package org.apache.kylin.common.cache; - -import org.apache.kylin.common.restclient.AbstractRestCache; -import org.apache.kylin.common.restclient.Broadcaster; - -/** - */ -@SuppressWarnings({ "rawtypes", "unchecked" }) -public class LocalCacheUpdater implements CacheUpdater { - @Override - public void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache) { - if (syncAction == Broadcaster.EVENT.CREATE || syncAction == Broadcaster.EVENT.UPDATE) { - cache.putLocal(key, value); - } else if (syncAction == Broadcaster.EVENT.DROP) { - cache.removeLocal(key); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java b/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java deleted file mode 100644 index 2927d2d..0000000 --- a/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java +++ /dev/null @@ -1,14 +0,0 @@ -package org.apache.kylin.common.cache; - -import org.apache.kylin.common.restclient.AbstractRestCache; -import org.apache.kylin.common.restclient.Broadcaster; - -/** - */ -@SuppressWarnings("rawtypes") -public class RemoteCacheUpdater implements CacheUpdater { - @Override - public void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache) { - Broadcaster.getInstance().queue(type.getType(), syncAction.getType(), key.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java b/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java index 68d9be5..fc030b4 100644 --- a/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java +++ b/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java @@ -18,8 +18,7 @@ package org.apache.kylin.common.restclient; -import org.apache.kylin.common.cache.CacheUpdater; -import org.apache.kylin.common.cache.RemoteCacheUpdater; +import org.apache.kylin.common.KylinConfig; /** * @author xjiang @@ -27,17 +26,17 @@ import org.apache.kylin.common.cache.RemoteCacheUpdater; */ public abstract class AbstractRestCache<K, V> { - protected static CacheUpdater cacheUpdater = new RemoteCacheUpdater(); - - public static void setCacheUpdater(CacheUpdater cu) { - cacheUpdater = cu; - } - + protected final KylinConfig config; protected final Broadcaster.TYPE syncType; - protected AbstractRestCache(Broadcaster.TYPE syncType) { + protected AbstractRestCache(KylinConfig config, Broadcaster.TYPE syncType) { + this.config = config; this.syncType = syncType; } + + public Broadcaster getBroadcaster() { + return Broadcaster.getInstance(config); + } public abstract void put(K key, V value); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java index 80ec33c..871d77c 100644 --- a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java +++ b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.concurrent.BlockingDeque; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; @@ -43,24 +44,52 @@ public class Broadcaster { private static final Logger logger = LoggerFactory.getLogger(Broadcaster.class); + // static cached instances + private static final ConcurrentHashMap<KylinConfig, Broadcaster> CACHE = new ConcurrentHashMap<KylinConfig, Broadcaster>(); + + public static Broadcaster getInstance(KylinConfig config) { + Broadcaster r = CACHE.get(config); + if (r != null) { + return r; + } + + synchronized (Broadcaster.class) { + r = CACHE.get(config); + if (r != null) { + return r; + } + + r = new Broadcaster(config); + CACHE.put(config, r); + if (CACHE.size() > 1) { + logger.warn("More than one cubemanager singleton exist"); + } + return r; + } + } + + public static void clearCache() { + CACHE.clear(); + } + + // ============================================================================ + private BlockingDeque<BroadcastEvent> broadcastEvents = new LinkedBlockingDeque<>(); private AtomicLong counter = new AtomicLong(); - static class BroadcasterHolder { - static final Broadcaster INSTANCE = new Broadcaster(); - } + private Broadcaster(final KylinConfig config) { + final String[] nodes = config.getRestServers(); + if (nodes == null || nodes.length < 1) { + logger.warn("There is no available rest server; check the 'kylin.rest.servers' config"); + broadcastEvents = null; // disable the broadcaster + return; + } + logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes)); - private Broadcaster() { Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() { @Override public void run() { - final String[] nodes = KylinConfig.getInstanceFromEnv().getRestServers(); - if (nodes == null || nodes.length < 1) {//TODO if the node count is greater than 1, it means it is a cluster - logger.warn("There is no available rest server; check the 'kylin.rest.servers' config"); - return; - } - logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes)); final List<RestClient> restClients = Lists.newArrayList(); for (String node : nodes) { restClients.add(new RestClient(node)); @@ -90,10 +119,6 @@ public class Broadcaster { }); } - public static Broadcaster getInstance() { - return BroadcasterHolder.INSTANCE; - } - /** * Broadcast the cubedesc event out * @@ -101,6 +126,9 @@ public class Broadcaster { * event action */ public void queue(String type, String action, String key) { + if (broadcastEvents == null) + return; + try { counter.incrementAndGet(); broadcastEvents.putFirst(new BroadcastEvent(type, action, key)); @@ -138,7 +166,7 @@ public class Broadcaster { } public enum TYPE { - ALL("all"), CUBE("cube"),STREAMING("streaming"),KAFKA("kafka"),CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), HYBRID("hybrid"); + ALL("all"), CUBE("cube"), STREAMING("streaming"), KAFKA("kafka"), CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), HYBRID("hybrid"); private String text; TYPE(String text) { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java b/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java index 68e3c04..2bcddbf 100644 --- a/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java +++ b/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java @@ -20,12 +20,14 @@ package org.apache.kylin.common.restclient; import java.util.concurrent.ConcurrentSkipListMap; +import org.apache.kylin.common.KylinConfig; + /** */ public class CaseInsensitiveStringCache<V> extends SingleValueCache<String, V> { - public CaseInsensitiveStringCache(Broadcaster.TYPE syncType) { - super(syncType, new ConcurrentSkipListMap<String, V>(String.CASE_INSENSITIVE_ORDER)); + public CaseInsensitiveStringCache(KylinConfig config, Broadcaster.TYPE syncType) { + super(config, syncType, new ConcurrentSkipListMap<String, V>(String.CASE_INSENSITIVE_ORDER)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java b/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java index cb6c286..9acfeca 100644 --- a/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java +++ b/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java @@ -25,6 +25,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.kylin.common.KylinConfig; + /** * @author xjiang * @@ -33,12 +35,12 @@ public abstract class SingleValueCache<K, V> extends AbstractRestCache<K, V> { private final ConcurrentMap<K, V> innerCache; - public SingleValueCache(Broadcaster.TYPE syncType) { - this(syncType, new ConcurrentHashMap<K, V>()); + public SingleValueCache(KylinConfig config, Broadcaster.TYPE syncType) { + this(config, syncType, new ConcurrentHashMap<K, V>()); } - public SingleValueCache(Broadcaster.TYPE syncType, ConcurrentMap<K, V> innerCache) { - super(syncType); + public SingleValueCache(KylinConfig config, Broadcaster.TYPE syncType, ConcurrentMap<K, V> innerCache) { + super(config, syncType); this.innerCache = innerCache; } @@ -48,9 +50,9 @@ public abstract class SingleValueCache<K, V> extends AbstractRestCache<K, V> { innerCache.put(key, value); if (!exists) { - cacheUpdater.updateCache(key, value, Broadcaster.EVENT.CREATE, syncType, this); + getBroadcaster().queue(syncType.getType(), Broadcaster.EVENT.CREATE.getType(), key.toString()); } else { - cacheUpdater.updateCache(key, value, Broadcaster.EVENT.UPDATE, syncType, this); + getBroadcaster().queue(syncType.getType(), Broadcaster.EVENT.UPDATE.getType(), key.toString()); } } @@ -64,7 +66,7 @@ public abstract class SingleValueCache<K, V> extends AbstractRestCache<K, V> { innerCache.remove(key); if (exists) { - cacheUpdater.updateCache(key, null, Broadcaster.EVENT.DROP, syncType, this); + getBroadcaster().queue(syncType.getType(), Broadcaster.EVENT.DROP.getType(), key.toString()); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java index 8e75d29..c50836c 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java @@ -57,7 +57,7 @@ public class CubeDescManager { private KylinConfig config; // name ==> CubeDesc - private CaseInsensitiveStringCache<CubeDesc> cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(Broadcaster.TYPE.CUBE_DESC); + private CaseInsensitiveStringCache<CubeDesc> cubeDescMap; public static CubeDescManager getInstance(KylinConfig config) { CubeDescManager r = CACHE.get(config); @@ -90,6 +90,7 @@ public class CubeDescManager { private CubeDescManager(KylinConfig config) throws IOException { logger.info("Initializing CubeDescManager with config " + config); this.config = config; + this.cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(config, Broadcaster.TYPE.CUBE_DESC); reloadAllCubeDesc(); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 9b7a024..a249cd7 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -116,7 +116,7 @@ public class CubeManager implements IRealizationProvider { private KylinConfig config; // cube name ==> CubeInstance - private CaseInsensitiveStringCache<CubeInstance> cubeMap = new CaseInsensitiveStringCache<CubeInstance>(Broadcaster.TYPE.CUBE); + private CaseInsensitiveStringCache<CubeInstance> cubeMap; // "table/column" ==> lookup table // private SingleValueCache<String, LookupStringTable> lookupTables = new SingleValueCache<String, LookupStringTable>(Broadcaster.TYPE.METADATA); @@ -126,7 +126,7 @@ public class CubeManager implements IRealizationProvider { private CubeManager(KylinConfig config) throws IOException { logger.info("Initializing CubeManager with config " + config); this.config = config; - + this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, Broadcaster.TYPE.CUBE); loadAllCubeInstance(); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java index b7e7dc5..c907afd 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java @@ -103,11 +103,11 @@ public class MetadataManager { private KylinConfig config; // table name ==> SourceTable - private CaseInsensitiveStringCache<TableDesc> srcTableMap = new CaseInsensitiveStringCache<TableDesc>(Broadcaster.TYPE.TABLE); + private CaseInsensitiveStringCache<TableDesc> srcTableMap; // name => value - private CaseInsensitiveStringCache<Map<String, String>> srcTableExdMap = new CaseInsensitiveStringCache<Map<String, String>>(Broadcaster.TYPE.TABLE); + private CaseInsensitiveStringCache<Map<String, String>> srcTableExdMap; // name => DataModelDesc - private CaseInsensitiveStringCache<DataModelDesc> dataModelDescMap = new CaseInsensitiveStringCache<DataModelDesc>(Broadcaster.TYPE.DATA_MODEL); + private CaseInsensitiveStringCache<DataModelDesc> dataModelDescMap; private MetadataManager(KylinConfig config) throws IOException { init(config); @@ -198,6 +198,10 @@ public class MetadataManager { private void init(KylinConfig config) throws IOException { this.config = config; + this.srcTableMap = new CaseInsensitiveStringCache<TableDesc>(config, Broadcaster.TYPE.TABLE); + this.srcTableExdMap = new CaseInsensitiveStringCache<Map<String, String>>(config, Broadcaster.TYPE.TABLE); + this.dataModelDescMap = new CaseInsensitiveStringCache<DataModelDesc>(config, Broadcaster.TYPE.DATA_MODEL); + reloadAllSourceTable(); reloadAllSourceTableExd(); reloadAllDataModel(); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java index fd41f59..8304128 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java @@ -81,11 +81,12 @@ public class ProjectManager { private KylinConfig config; private ProjectL2Cache l2Cache; // project name => ProjrectInstance - private CaseInsensitiveStringCache<ProjectInstance> projectMap = new CaseInsensitiveStringCache<ProjectInstance>(Broadcaster.TYPE.PROJECT); + private CaseInsensitiveStringCache<ProjectInstance> projectMap; private ProjectManager(KylinConfig config) throws IOException { logger.info("Initializing ProjectManager with metadata url " + config); this.config = config; + this.projectMap = new CaseInsensitiveStringCache<ProjectInstance>(config, Broadcaster.TYPE.PROJECT); this.l2Cache = new ProjectL2Cache(this); reloadAllProjects(); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java index 0f00f1a..9392ef5 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java @@ -76,12 +76,12 @@ public class HybridManager implements IRealizationProvider { private KylinConfig config; - private CaseInsensitiveStringCache<HybridInstance> hybridMap = new CaseInsensitiveStringCache<HybridInstance>(Broadcaster.TYPE.HYBRID); + private CaseInsensitiveStringCache<HybridInstance> hybridMap; private HybridManager(KylinConfig config) throws IOException { logger.info("Initializing HybridManager with config " + config); this.config = config; - + this.hybridMap = new CaseInsensitiveStringCache<HybridInstance>(config, Broadcaster.TYPE.HYBRID); loadAllHybridInstance(); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java index fa7d0f8..8cabe1b 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java @@ -79,7 +79,7 @@ public class StreamingManager { private KylinConfig config; // name ==> StreamingConfig - private CaseInsensitiveStringCache<StreamingConfig> streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(Broadcaster.TYPE.STREAMING); + private CaseInsensitiveStringCache<StreamingConfig> streamingMap; public static void clearCache() { CACHE.clear(); @@ -87,6 +87,7 @@ public class StreamingManager { private StreamingManager(KylinConfig config) throws IOException { this.config = config; + this.streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(config, Broadcaster.TYPE.STREAMING); reloadAllStreaming(); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java index e3a7133..a73a6ac 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java @@ -34,11 +34,10 @@ package org.apache.kylin.engine.streaming.cli; -import com.google.common.base.Preconditions; +import java.util.List; + import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.cache.RemoteCacheUpdater; -import org.apache.kylin.common.restclient.AbstractRestCache; import org.apache.kylin.common.util.Pair; import org.apache.kylin.engine.streaming.BootstrapConfig; import org.apache.kylin.engine.streaming.OneOffStreamingBuilder; @@ -48,7 +47,7 @@ import org.apache.kylin.engine.streaming.monitor.StreamingMonitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; +import com.google.common.base.Preconditions; public class StreamingCLI { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/examples/test_case_data/localmeta/kylin.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties index 92773a1..48f01f5 100644 --- a/examples/test_case_data/localmeta/kylin.properties +++ b/examples/test_case_data/localmeta/kylin.properties @@ -1,7 +1,7 @@ ## Config for Kylin Engine ## # List of web servers in use, this enables one web server instance to sync up with other servers. -kylin.rest.servers=localhost:7070 +#kylin.rest.servers=localhost:7070 # The metadata store in hbase kylin.metadata.url= http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java index a166ae7..917fe46 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java @@ -54,7 +54,7 @@ public class IIDescManager { private KylinConfig config; // name ==> IIDesc - private CaseInsensitiveStringCache<IIDesc> iiDescMap = new CaseInsensitiveStringCache<IIDesc>(Broadcaster.TYPE.INVERTED_INDEX_DESC); + private CaseInsensitiveStringCache<IIDesc> iiDescMap; public static IIDescManager getInstance(KylinConfig config) { IIDescManager r = CACHE.get(config); @@ -87,6 +87,7 @@ public class IIDescManager { private IIDescManager(KylinConfig config) throws IOException { logger.info("Initializing IIDescManager with config " + config); this.config = config; + this.iiDescMap = new CaseInsensitiveStringCache<IIDesc>(config, Broadcaster.TYPE.INVERTED_INDEX_DESC); reloadAllIIDesc(); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java index 5633004..b6dfdf1 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java @@ -92,7 +92,7 @@ public class IIManager implements IRealizationProvider { private KylinConfig config; // ii name ==> IIInstance - private CaseInsensitiveStringCache<IIInstance> iiMap = new CaseInsensitiveStringCache<IIInstance>(Broadcaster.TYPE.INVERTED_INDEX); + private CaseInsensitiveStringCache<IIInstance> iiMap; // for generation hbase table name of a new segment private Multimap<String, String> usedStorageLocation = HashMultimap.create(); @@ -100,7 +100,7 @@ public class IIManager implements IRealizationProvider { private IIManager(KylinConfig config) throws IOException { logger.info("Initializing IIManager with config " + config); this.config = config; - + this.iiMap = new CaseInsensitiveStringCache<IIInstance>(config, Broadcaster.TYPE.INVERTED_INDEX); loadAllIIInstance(); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java index af1bbc0..b90c10a 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java @@ -71,7 +71,9 @@ public class CacheServiceTest extends LocalFileMetadataTestCase { public static void beforeClass() throws Exception { staticCreateTestMetadata(); configA = KylinConfig.getInstanceFromEnv(); + configA.setProperty("kylin.rest.servers", "localhost:7070"); configB = KylinConfig.getKylinConfigFromInputStream(KylinConfig.getKylinPropertiesAsInputSteam()); + configB.setProperty("kylin.rest.servers", "localhost:7070"); configB.setMetadataUrl("../examples/test_metadata"); server = new Server(7070); @@ -209,7 +211,7 @@ public class CacheServiceTest extends LocalFileMetadataTestCase { @Test public void testCubeCRUD() throws Exception { - final Broadcaster broadcaster = Broadcaster.getInstance(); + final Broadcaster broadcaster = Broadcaster.getInstance(configA); broadcaster.getCounterAndClear(); getStore().deleteResource("/cube/a_whole_new_cube.json"); @@ -306,7 +308,7 @@ public class CacheServiceTest extends LocalFileMetadataTestCase { public void testMetaCRUD() throws Exception { final MetadataManager metadataManager = MetadataManager.getInstance(configA); final MetadataManager metadataManagerB = MetadataManager.getInstance(configB); - final Broadcaster broadcaster = Broadcaster.getInstance(); + final Broadcaster broadcaster = Broadcaster.getInstance(configA); broadcaster.getCounterAndClear(); TableDesc tableDesc = createTestTableDesc(); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java index 3032d13..8cf51b6 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java @@ -34,10 +34,12 @@ package org.apache.kylin.source.kafka; -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.type.MapType; -import com.fasterxml.jackson.databind.type.SimpleType; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.JsonSerializer; @@ -45,17 +47,15 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.restclient.Broadcaster; import org.apache.kylin.common.restclient.CaseInsensitiveStringCache; -import org.apache.kylin.engine.streaming.StreamingConfig; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.source.kafka.config.KafkaConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.MapType; +import com.fasterxml.jackson.databind.type.SimpleType; /** */ @@ -69,7 +69,7 @@ public class KafkaConfigManager { private KylinConfig config; // name ==> StreamingConfig - private CaseInsensitiveStringCache<KafkaConfig> kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(Broadcaster.TYPE.KAFKA); + private CaseInsensitiveStringCache<KafkaConfig> kafkaMap; public static final Serializer<KafkaConfig> KAFKA_SERIALIZER = new JsonSerializer<KafkaConfig>(KafkaConfig.class); @@ -79,6 +79,7 @@ public class KafkaConfigManager { private KafkaConfigManager(KylinConfig config) throws IOException { this.config = config; + this.kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(config, Broadcaster.TYPE.KAFKA); reloadAllKafkaConfig(); }