This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch 2.5.x in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/2.5.x by this push: new 19b2bf7 KYLIN-3752 Increase broadcaster's concurrency to avoid exceptions 19b2bf7 is described below commit 19b2bf7e5f544d2dc5513c7dd4b7e3ac90196a1e Author: xbirbird <31679225+xbirb...@users.noreply.github.com> AuthorDate: Wed Jan 2 10:43:19 2019 +0800 KYLIN-3752 Increase broadcaster's concurrency to avoid exceptions --- .../java/org/apache/kylin/metadata/cachesync/Broadcaster.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java index d5ecc16..b67d10c 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java @@ -91,14 +91,17 @@ public class Broadcaster { this.config = config; this.syncErrorHandler = getSyncErrorHandler(config); this.announceMainLoop = Executors.newSingleThreadExecutor(new DaemonThreadFactory()); - this.announceThreadPool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory()); - + final String[] nodes = config.getRestServers(); if (nodes == null || nodes.length < 1) { logger.warn("There is no available rest server; check the 'kylin.server.cluster-servers' config"); } - logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes)); + logger.debug("{} nodes in the cluster: {}", (nodes == null ? 0 : nodes.length), Arrays.toString(nodes)); + + int corePoolSize = (nodes == null || nodes.length < 1)? 1 : nodes.length; + int maximumPoolSize = (nodes == null || nodes.length < 1)? 10 : nodes.length * 2; + this.announceThreadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory()); announceMainLoop.execute(new Runnable() { @Override