KYLIN-1205 Make HBase scan parameters configurable and allow reset of HConnection
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b0c5fd3b Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b0c5fd3b Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b0c5fd3b Branch: refs/heads/1.x-HBase1.1.3 Commit: b0c5fd3bc7cce69dd5f0413d172fd7c7a79d9f5a Parents: e85b243 Author: Li, Yang <yang...@ebay.com> Authored: Tue Dec 8 10:43:03 2015 +0800 Committer: Li, Yang <yang...@ebay.com> Committed: Tue Dec 8 10:43:03 2015 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/common/KylinConfig.java | 8 ++++++++ .../common/persistence/HBaseConnection.java | 4 ++++ .../apache/kylin/rest/service/CacheService.java | 2 ++ .../storage/hbase/CubeSegmentTupleIterator.java | 19 +++++++++++++++---- 4 files changed, 29 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b0c5fd3b/common/src/main/java/org/apache/kylin/common/KylinConfig.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java index 9645b81..7816487 100644 --- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -499,7 +499,15 @@ public class KylinConfig { public int getHBaseKeyValueSize() { return Integer.parseInt(this.getOptional("kylin.hbase.client.keyvalue.maxsize", "10485760")); } + + public int getHBaseScanCacheRows() { + return Integer.parseInt(this.getOptional("kylin.hbase.scan.cache_rows", "1024")); + } + public int getHBaseScanMaxResultSize() { + return Integer.parseInt(this.getOptional("kylin.hbase.scan.max_result_size", "" + (5 * 1024 * 1024))); // 5 MB + } + public String getHbaseDefaultCompressionCodec() { return getOptional(HTABLE_DEFAULT_COMPRESSION_CODEC, ""); http://git-wip-us.apache.org/repos/asf/kylin/blob/b0c5fd3b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java index dcefc24..9c86376 100644 --- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java +++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java @@ -58,6 +58,10 @@ public class HBaseConnection { } }); } + + public static void clearCache() { + ConnPool.clear(); + } public static HConnection get(String url) { // find configuration http://git-wip-us.apache.org/repos/asf/kylin/blob/b0c5fd3b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java index 38d7400..2328592 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java @@ -21,6 +21,7 @@ package org.apache.kylin.rest.service; import java.io.IOException; import java.util.List; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.restclient.Broadcaster; import org.apache.kylin.cube.CubeDescManager; import org.apache.kylin.cube.CubeManager; @@ -101,6 +102,7 @@ public class CacheService extends BasicService { RealizationRegistry.clearCache(); ProjectManager.clearCache(); BasicService.resetOLAPDataSources(); + HBaseConnection.clearCache(); break; default: throw new RuntimeException("invalid cacheType:" + cacheType); http://git-wip-us.apache.org/repos/asf/kylin/blob/b0c5fd3b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java index 9efbb79..fa4ccd7 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FuzzyRowFilter; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.StorageException; import org.apache.kylin.common.util.Array; import org.apache.kylin.common.util.Bytes; @@ -72,8 +73,6 @@ public class CubeSegmentTupleIterator implements ITupleIterator { public static final Logger logger = LoggerFactory.getLogger(CubeSegmentTupleIterator.class); - public static final int SCAN_CACHE = 1024; - private final CubeInstance cube; private final CubeSegment cubeSeg; private final Collection<TblColRef> dimensions; @@ -252,8 +251,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { private Scan buildScan(HBaseKeyRange keyRange) { Scan scan = new Scan(); - scan.setCaching(SCAN_CACHE); - scan.setCacheBlocks(true); + tuneScanParameters(scan); scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE)); for (RowValueDecoder valueDecoder : this.rowValueDecoders) { HBaseColumnDesc hbaseColumn = valueDecoder.getHBaseColumn(); @@ -266,6 +264,19 @@ public class CubeSegmentTupleIterator implements ITupleIterator { return scan; } + private void tuneScanParameters(Scan scan) { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + + scan.setCaching(config.getHBaseScanCacheRows()); + scan.setMaxResultSize(config.getHBaseScanMaxResultSize()); + scan.setCacheBlocks(true); + + // cache less when there are memory hungry measures + if (RowValueDecoder.hasMemHungryCountDistinct(rowValueDecoders)) { + scan.setCaching(scan.getCaching() / 10); + } + } + private void applyFuzzyFilter(Scan scan, HBaseKeyRange keyRange) { List<Pair<byte[], byte[]>> fuzzyKeys = keyRange.getFuzzyKeys(); if (fuzzyKeys != null && fuzzyKeys.size() > 0) {