Repository: kylin Updated Branches: refs/heads/2.x-staging 9203b230e -> 114e0ed02
KYLIN-1205 Make some HBase scan tunings configurable Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/114e0ed0 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/114e0ed0 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/114e0ed0 Branch: refs/heads/2.x-staging Commit: 114e0ed021e4d12e494fab1d550ed3f8cc52032f Parents: 9203b23 Author: Li, Yang <[email protected]> Authored: Wed Dec 9 16:19:38 2015 +0800 Committer: Li, Yang <[email protected]> Committed: Wed Dec 9 16:19:38 2015 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/common/KylinConfig.java | 8 +++++++ .../java/org/apache/kylin/gridtable/GTInfo.java | 8 +++++++ .../apache/kylin/rest/service/CacheService.java | 2 ++ .../kylin/storage/hbase/HBaseConnection.java | 4 ++++ .../hbase/cube/v1/CubeSegmentTupleIterator.java | 24 ++++++++++++++------ .../storage/hbase/cube/v2/CubeHBaseRPC.java | 23 ++++++++++++++----- .../kylin/storage/hbase/cube/v2/RawScan.java | 7 +++++- .../storage/hbase/steps/HBaseMROutput2.java | 2 +- .../hbase/steps/HBaseMROutput2Transition.java | 2 +- .../kylin/storage/hbase/util/RowCounterCLI.java | 2 +- 10 files changed, 65 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/114e0ed0/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java index 431691c..2b1b908 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -521,6 +521,14 @@ public class KylinConfig implements Serializable { return Integer.parseInt(this.getOptional("kylin.hbase.client.keyvalue.maxsize", "10485760")); } + public int getHBaseScanCacheRows() { + return Integer.parseInt(this.getOptional("kylin.hbase.scan.cache_rows", "1024")); + } + + public int getHBaseScanMaxResultSize() { + return Integer.parseInt(this.getOptional("kylin.hbase.scan.max_result_size", "" + (5 * 1024 * 1024))); // 5 MB + } + public boolean isCubingInMem() { return Boolean.parseBoolean(this.getOptional(KYLIN_JOB_CUBING_IN_MEM, "false")); } http://git-wip-us.apache.org/repos/asf/kylin/blob/114e0ed0/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java index d3a03d1..5479449 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java @@ -87,6 +87,14 @@ public class GTInfo { return max; } + public ImmutableBitSet selectColumns(ImmutableBitSet selectedColBlocks) { + ImmutableBitSet result = ImmutableBitSet.EMPTY; + for (int i = 0; i < selectedColBlocks.trueBitCount(); i++) { + result = result.or(colBlocks[selectedColBlocks.trueBitAt(i)]); + } + return result; + } + public ImmutableBitSet selectColumnBlocks(ImmutableBitSet columns) { if (columns == null) columns = colAll; http://git-wip-us.apache.org/repos/asf/kylin/blob/114e0ed0/server/src/main/java/org/apache/kylin/rest/service/CacheService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java index 0150b2e..89c525d 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java @@ -49,6 +49,7 @@ import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.query.enumerator.OLAPQuery; import org.apache.kylin.query.schema.OLAPSchemaFactory; import org.apache.kylin.source.kafka.KafkaConfigManager; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hybrid.HybridManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -218,6 +219,7 @@ public class CacheService extends BasicService { ProjectManager.clearCache(); KafkaConfigManager.clearCache(); StreamingManager.clearCache(); + HBaseConnection.clearConnCache(); cleanAllDataCache(); removeAllOLAPDataSources(); http://git-wip-us.apache.org/repos/asf/kylin/blob/114e0ed0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java index d1a92a0..661e8e4 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java @@ -65,6 +65,10 @@ public class HBaseConnection { } }); } + + public static void clearConnCache() { + ConnPool.clear(); + } private static final ThreadLocal<Configuration> hbaseConfig = new ThreadLocal<>(); http://git-wip-us.apache.org/repos/asf/kylin/blob/114e0ed0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java index 50cfefa..43fb1b5 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FuzzyRowFilter; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.StorageException; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.CubeSegment; @@ -62,8 +63,6 @@ public class CubeSegmentTupleIterator implements ITupleIterator { public static final Logger logger = LoggerFactory.getLogger(CubeSegmentTupleIterator.class); - public static final int SCAN_CACHE = 1024; - protected final CubeSegment cubeSeg; private final TupleFilter filter; private final Collection<TblColRef> groupBy; @@ -83,7 +82,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { protected int scanCountDelta; protected Tuple next; protected final Cuboid cuboid; - + public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, // Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, // List<RowValueDecoder> rowValueDecoders, StorageContext context, TupleInfo returnTupleInfo) { @@ -109,7 +108,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { throw new StorageException("Error when open connection to table " + tableName, t); } } - + @Override public boolean hasNext() { @@ -138,7 +137,6 @@ public class CubeSegmentTupleIterator implements ITupleIterator { return true; } - @Override public Tuple next() { if (next == null) { @@ -202,8 +200,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { private Scan buildScan(HBaseKeyRange keyRange) { Scan scan = new Scan(); - scan.setCaching(SCAN_CACHE); - scan.setCacheBlocks(true); + tuneScanParameters(scan); scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE)); for (RowValueDecoder valueDecoder : this.rowValueDecoders) { HBaseColumnDesc hbaseColumn = valueDecoder.getHBaseColumn(); @@ -216,6 +213,19 @@ public class CubeSegmentTupleIterator implements ITupleIterator { return scan; } + private void tuneScanParameters(Scan scan) { + KylinConfig config = cubeSeg.getCubeDesc().getConfig(); + + scan.setCaching(config.getHBaseScanCacheRows()); + scan.setMaxResultSize(config.getHBaseScanMaxResultSize()); + scan.setCacheBlocks(true); + + // cache less when there are memory hungry measures + if (RowValueDecoder.hasMemHungryMeasures(rowValueDecoders)) { + scan.setCaching(scan.getCaching() / 10); + } + } + private void applyFuzzyFilter(Scan scan, HBaseKeyRange keyRange) { List<org.apache.kylin.common.util.Pair<byte[], byte[]>> fuzzyKeys = keyRange.getFuzzyKeys(); if (fuzzyKeys != null && fuzzyKeys.size() > 0) { http://git-wip-us.apache.org/repos/asf/kylin/blob/114e0ed0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index 6a6c887..477d32d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -11,6 +11,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FuzzyRowFilter; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.Pair; @@ -39,8 +40,6 @@ public abstract class CubeHBaseRPC { public static final Logger logger = LoggerFactory.getLogger(CubeHBaseRPC.class); - public static final int SCAN_CACHE = 1024; - final protected CubeSegment cubeSeg; final protected Cuboid cuboid; final protected GTInfo fullGTInfo; @@ -60,7 +59,8 @@ public abstract class CubeHBaseRPC { public static Scan buildScan(RawScan rawScan) { Scan scan = new Scan(); - scan.setCaching(SCAN_CACHE); + scan.setCaching(rawScan.hbaseCaching); + scan.setMaxResultSize(rawScan.hbaseMaxResultSize); scan.setCacheBlocks(true); scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE)); @@ -110,13 +110,24 @@ public abstract class CubeHBaseRPC { Preconditions.checkState(startKeys.size() == endKeys.size()); List<Pair<byte[], byte[]>> hbaseFuzzyKeys = translateFuzzyKeys(fuzzyKeys); + KylinConfig config = cubeSeg.getCubeDesc().getConfig(); + int hbaseCaching = config.getHBaseScanCacheRows(); + int hbaseMaxResultSize = config.getHBaseScanMaxResultSize(); + if (isMemoryHungry(selectedColBlocks)) + hbaseCaching /= 10; + for (short i = 0; i < startKeys.size(); ++i) { - ret.add(new RawScan(startKeys.get(i), endKeys.get(i), selectedColumns, hbaseFuzzyKeys)); + ret.add(new RawScan(startKeys.get(i), endKeys.get(i), selectedColumns, hbaseFuzzyKeys, hbaseCaching, hbaseMaxResultSize)); } return ret; } + private boolean isMemoryHungry(ImmutableBitSet selectedColBlocks) { + ImmutableBitSet selectColumns = fullGTInfo.selectColumns(selectedColBlocks); + return fullGTInfo.getMaxColumnLength(selectColumns) > 1024; + } + /** * translate GTRecord format fuzzy keys to hbase expected format * @return @@ -219,8 +230,8 @@ public abstract class CubeHBaseRPC { private static List<org.apache.hadoop.hbase.util.Pair<byte[], byte[]>> convertToHBasePair(List<org.apache.kylin.common.util.Pair<byte[], byte[]>> pairList) { List<org.apache.hadoop.hbase.util.Pair<byte[], byte[]>> result = Lists.newArrayList(); - for (org.apache.kylin.common.util.Pair pair : pairList) { - org.apache.hadoop.hbase.util.Pair element = new org.apache.hadoop.hbase.util.Pair(pair.getFirst(), pair.getSecond()); + for (org.apache.kylin.common.util.Pair<byte[], byte[]> pair : pairList) { + org.apache.hadoop.hbase.util.Pair<byte[], byte[]> element = new org.apache.hadoop.hbase.util.Pair<byte[], byte[]>(pair.getFirst(), pair.getSecond()); result.add(element); } http://git-wip-us.apache.org/repos/asf/kylin/blob/114e0ed0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java index ad4263f..9707a99 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java @@ -29,13 +29,18 @@ public class RawScan { public byte[] endKey; public List<Pair<byte[], byte[]>> hbaseColumns;//only contain interested columns public List<Pair<byte[], byte[]>> fuzzyKey; + public int hbaseCaching; + public int hbaseMaxResultSize; - public RawScan(byte[] startKey, byte[] endKey, List<Pair<byte[], byte[]>> hbaseColumns, List<Pair<byte[], byte[]>> fuzzyKey) { + public RawScan(byte[] startKey, byte[] endKey, List<Pair<byte[], byte[]>> hbaseColumns, // + List<Pair<byte[], byte[]>> fuzzyKey, int hbaseCaching, int hbaseMaxResultSize) { this.startKey = startKey; this.endKey = endKey; this.hbaseColumns = hbaseColumns; this.fuzzyKey = fuzzyKey; + this.hbaseCaching = hbaseCaching; + this.hbaseMaxResultSize = hbaseMaxResultSize; } public String getStartKeyAsString() { http://git-wip-us.apache.org/repos/asf/kylin/blob/114e0ed0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java index e565e86..397f4fe 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java @@ -163,7 +163,7 @@ public class HBaseMROutput2 implements IMROutput2 { List<Scan> scans = new ArrayList<Scan>(); for (String htable : new HBaseMRSteps(seg).getMergingHTables()) { Scan scan = new Scan(); - scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs + scan.setCaching(512); // 1 is the default in Scan, which will be bad for MapReduce jobs scan.setCacheBlocks(false); // don't set to true for MR jobs scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable)); scans.add(scan); http://git-wip-us.apache.org/repos/asf/kylin/blob/114e0ed0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java index dff1e51..ac304f5 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java @@ -180,7 +180,7 @@ public class HBaseMROutput2Transition implements IMROutput2 { List<Scan> scans = new ArrayList<Scan>(); for (String htable : new HBaseMRSteps(seg).getMergingHTables()) { Scan scan = new Scan(); - scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs + scan.setCaching(512); // 1 is the default in Scan, which will be bad for MapReduce jobs scan.setCacheBlocks(false); // don't set to true for MR jobs scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable)); scans.add(scan); http://git-wip-us.apache.org/repos/asf/kylin/blob/114e0ed0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java index 9cd6d23..038e954 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java @@ -45,7 +45,7 @@ public class RowCounterCLI { Configuration conf = HBaseConfiguration.create(); Scan scan = new Scan(); - scan.setCaching(1024); + scan.setCaching(512); scan.setCacheBlocks(true); scan.setStartRow(startKey); scan.setStopRow(endKey);
