Repository: kylin
Updated Branches:
  refs/heads/2.x-staging 9203b230e -> 114e0ed02


KYLIN-1205 Make some HBase scan tunings configurable


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/114e0ed0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/114e0ed0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/114e0ed0

Branch: refs/heads/2.x-staging
Commit: 114e0ed021e4d12e494fab1d550ed3f8cc52032f
Parents: 9203b23
Author: Li, Yang <[email protected]>
Authored: Wed Dec 9 16:19:38 2015 +0800
Committer: Li, Yang <[email protected]>
Committed: Wed Dec 9 16:19:38 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    |  8 +++++++
 .../java/org/apache/kylin/gridtable/GTInfo.java |  8 +++++++
 .../apache/kylin/rest/service/CacheService.java |  2 ++
 .../kylin/storage/hbase/HBaseConnection.java    |  4 ++++
 .../hbase/cube/v1/CubeSegmentTupleIterator.java | 24 ++++++++++++++------
 .../storage/hbase/cube/v2/CubeHBaseRPC.java     | 23 ++++++++++++++-----
 .../kylin/storage/hbase/cube/v2/RawScan.java    |  7 +++++-
 .../storage/hbase/steps/HBaseMROutput2.java     |  2 +-
 .../hbase/steps/HBaseMROutput2Transition.java   |  2 +-
 .../kylin/storage/hbase/util/RowCounterCLI.java |  2 +-
 10 files changed, 65 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/114e0ed0/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 431691c..2b1b908 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -521,6 +521,14 @@ public class KylinConfig implements Serializable {
         return 
Integer.parseInt(this.getOptional("kylin.hbase.client.keyvalue.maxsize", 
"10485760"));
     }
 
+    public int getHBaseScanCacheRows() {
+        return 
Integer.parseInt(this.getOptional("kylin.hbase.scan.cache_rows", "1024"));
+    }
+
+    public int getHBaseScanMaxResultSize() {
+        return 
Integer.parseInt(this.getOptional("kylin.hbase.scan.max_result_size", "" + (5 * 
1024 * 1024))); // 5 MB
+    }
+
     public boolean isCubingInMem() {
         return Boolean.parseBoolean(this.getOptional(KYLIN_JOB_CUBING_IN_MEM, 
"false"));
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/114e0ed0/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
index d3a03d1..5479449 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
@@ -87,6 +87,14 @@ public class GTInfo {
         return max;
     }
 
+    public ImmutableBitSet selectColumns(ImmutableBitSet selectedColBlocks) {
+        ImmutableBitSet result = ImmutableBitSet.EMPTY;
+        for (int i = 0; i < selectedColBlocks.trueBitCount(); i++) {
+            result = result.or(colBlocks[selectedColBlocks.trueBitAt(i)]);
+        }
+        return result;
+    }
+    
     public ImmutableBitSet selectColumnBlocks(ImmutableBitSet columns) {
         if (columns == null)
             columns = colAll;

http://git-wip-us.apache.org/repos/asf/kylin/blob/114e0ed0/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java 
b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 0150b2e..89c525d 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -49,6 +49,7 @@ import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.query.enumerator.OLAPQuery;
 import org.apache.kylin.query.schema.OLAPSchemaFactory;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hybrid.HybridManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -218,6 +219,7 @@ public class CacheService extends BasicService {
                 ProjectManager.clearCache();
                 KafkaConfigManager.clearCache();
                 StreamingManager.clearCache();
+                HBaseConnection.clearConnCache();
 
                 cleanAllDataCache();
                 removeAllOLAPDataSources();

http://git-wip-us.apache.org/repos/asf/kylin/blob/114e0ed0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
index d1a92a0..661e8e4 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -65,6 +65,10 @@ public class HBaseConnection {
             }
         });
     }
+    
+    public static void clearConnCache() {
+        ConnPool.clear();
+    }
 
     private static final ThreadLocal<Configuration> hbaseConfig = new 
ThreadLocal<>();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/114e0ed0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
index 50cfefa..43fb1b5 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.StorageException;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.CubeSegment;
@@ -62,8 +63,6 @@ public class CubeSegmentTupleIterator implements 
ITupleIterator {
 
     public static final Logger logger = 
LoggerFactory.getLogger(CubeSegmentTupleIterator.class);
 
-    public static final int SCAN_CACHE = 1024;
-
     protected final CubeSegment cubeSeg;
     private final TupleFilter filter;
     private final Collection<TblColRef> groupBy;
@@ -83,7 +82,7 @@ public class CubeSegmentTupleIterator implements 
ITupleIterator {
     protected int scanCountDelta;
     protected Tuple next;
     protected final Cuboid cuboid;
-    
+
     public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> 
keyRanges, HConnection conn, //
             Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> 
groupBy, //
             List<RowValueDecoder> rowValueDecoders, StorageContext context, 
TupleInfo returnTupleInfo) {
@@ -109,7 +108,7 @@ public class CubeSegmentTupleIterator implements 
ITupleIterator {
             throw new StorageException("Error when open connection to table " 
+ tableName, t);
         }
     }
-    
+
     @Override
     public boolean hasNext() {
 
@@ -138,7 +137,6 @@ public class CubeSegmentTupleIterator implements 
ITupleIterator {
         return true;
     }
 
-    
     @Override
     public Tuple next() {
         if (next == null) {
@@ -202,8 +200,7 @@ public class CubeSegmentTupleIterator implements 
ITupleIterator {
 
     private Scan buildScan(HBaseKeyRange keyRange) {
         Scan scan = new Scan();
-        scan.setCaching(SCAN_CACHE);
-        scan.setCacheBlocks(true);
+        tuneScanParameters(scan);
         scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, 
Bytes.toBytes(Boolean.TRUE));
         for (RowValueDecoder valueDecoder : this.rowValueDecoders) {
             HBaseColumnDesc hbaseColumn = valueDecoder.getHBaseColumn();
@@ -216,6 +213,19 @@ public class CubeSegmentTupleIterator implements 
ITupleIterator {
         return scan;
     }
 
+    private void tuneScanParameters(Scan scan) {
+        KylinConfig config = cubeSeg.getCubeDesc().getConfig();
+
+        scan.setCaching(config.getHBaseScanCacheRows());
+        scan.setMaxResultSize(config.getHBaseScanMaxResultSize());
+        scan.setCacheBlocks(true);
+
+        // cache less when there are memory hungry measures
+        if (RowValueDecoder.hasMemHungryMeasures(rowValueDecoders)) {
+            scan.setCaching(scan.getCaching() / 10);
+        }
+    }
+
     private void applyFuzzyFilter(Scan scan, HBaseKeyRange keyRange) {
         List<org.apache.kylin.common.util.Pair<byte[], byte[]>> fuzzyKeys = 
keyRange.getFuzzyKeys();
         if (fuzzyKeys != null && fuzzyKeys.size() > 0) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/114e0ed0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 6a6c887..477d32d 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -11,6 +11,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.Pair;
@@ -39,8 +40,6 @@ public abstract class CubeHBaseRPC {
 
     public static final Logger logger = 
LoggerFactory.getLogger(CubeHBaseRPC.class);
 
-    public static final int SCAN_CACHE = 1024;
-
     final protected CubeSegment cubeSeg;
     final protected Cuboid cuboid;
     final protected GTInfo fullGTInfo;
@@ -60,7 +59,8 @@ public abstract class CubeHBaseRPC {
 
     public static Scan buildScan(RawScan rawScan) {
         Scan scan = new Scan();
-        scan.setCaching(SCAN_CACHE);
+        scan.setCaching(rawScan.hbaseCaching);
+        scan.setMaxResultSize(rawScan.hbaseMaxResultSize);
         scan.setCacheBlocks(true);
         scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, 
Bytes.toBytes(Boolean.TRUE));
 
@@ -110,13 +110,24 @@ public abstract class CubeHBaseRPC {
         Preconditions.checkState(startKeys.size() == endKeys.size());
         List<Pair<byte[], byte[]>> hbaseFuzzyKeys = 
translateFuzzyKeys(fuzzyKeys);
 
+        KylinConfig config = cubeSeg.getCubeDesc().getConfig();
+        int hbaseCaching = config.getHBaseScanCacheRows();
+        int hbaseMaxResultSize = config.getHBaseScanMaxResultSize();
+        if (isMemoryHungry(selectedColBlocks))
+            hbaseCaching /= 10;
+        
         for (short i = 0; i < startKeys.size(); ++i) {
-            ret.add(new RawScan(startKeys.get(i), endKeys.get(i), 
selectedColumns, hbaseFuzzyKeys));
+            ret.add(new RawScan(startKeys.get(i), endKeys.get(i), 
selectedColumns, hbaseFuzzyKeys, hbaseCaching, hbaseMaxResultSize));
         }
         return ret;
 
     }
 
+    private boolean isMemoryHungry(ImmutableBitSet selectedColBlocks) {
+        ImmutableBitSet selectColumns = 
fullGTInfo.selectColumns(selectedColBlocks);
+        return fullGTInfo.getMaxColumnLength(selectColumns) > 1024;
+    }
+
     /**
      * translate GTRecord format fuzzy keys to hbase expected format
      * @return
@@ -219,8 +230,8 @@ public abstract class CubeHBaseRPC {
 
     private static List<org.apache.hadoop.hbase.util.Pair<byte[], byte[]>> 
convertToHBasePair(List<org.apache.kylin.common.util.Pair<byte[], byte[]>> 
pairList) {
         List<org.apache.hadoop.hbase.util.Pair<byte[], byte[]>> result = 
Lists.newArrayList();
-        for (org.apache.kylin.common.util.Pair pair : pairList) {
-            org.apache.hadoop.hbase.util.Pair element = new 
org.apache.hadoop.hbase.util.Pair(pair.getFirst(), pair.getSecond());
+        for (org.apache.kylin.common.util.Pair<byte[], byte[]> pair : 
pairList) {
+            org.apache.hadoop.hbase.util.Pair<byte[], byte[]> element = new 
org.apache.hadoop.hbase.util.Pair<byte[], byte[]>(pair.getFirst(), 
pair.getSecond());
             result.add(element);
         }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/114e0ed0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
index ad4263f..9707a99 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
@@ -29,13 +29,18 @@ public class RawScan {
     public byte[] endKey;
     public List<Pair<byte[], byte[]>> hbaseColumns;//only contain interested 
columns
     public List<Pair<byte[], byte[]>> fuzzyKey;
+    public int hbaseCaching;
+    public int hbaseMaxResultSize;
 
-    public RawScan(byte[] startKey, byte[] endKey, List<Pair<byte[], byte[]>> 
hbaseColumns, List<Pair<byte[], byte[]>> fuzzyKey) {
+    public RawScan(byte[] startKey, byte[] endKey, List<Pair<byte[], byte[]>> 
hbaseColumns, //
+            List<Pair<byte[], byte[]>> fuzzyKey, int hbaseCaching, int 
hbaseMaxResultSize) {
 
         this.startKey = startKey;
         this.endKey = endKey;
         this.hbaseColumns = hbaseColumns;
         this.fuzzyKey = fuzzyKey;
+        this.hbaseCaching = hbaseCaching;
+        this.hbaseMaxResultSize = hbaseMaxResultSize;
     }
 
     public String getStartKeyAsString() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/114e0ed0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
index e565e86..397f4fe 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
@@ -163,7 +163,7 @@ public class HBaseMROutput2 implements IMROutput2 {
             List<Scan> scans = new ArrayList<Scan>();
             for (String htable : new HBaseMRSteps(seg).getMergingHTables()) {
                 Scan scan = new Scan();
-                scan.setCaching(500); // 1 is the default in Scan, which will 
be bad for MapReduce jobs
+                scan.setCaching(512); // 1 is the default in Scan, which will 
be bad for MapReduce jobs
                 scan.setCacheBlocks(false); // don't set to true for MR jobs
                 scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, 
Bytes.toBytes(htable));
                 scans.add(scan);

http://git-wip-us.apache.org/repos/asf/kylin/blob/114e0ed0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index dff1e51..ac304f5 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -180,7 +180,7 @@ public class HBaseMROutput2Transition implements IMROutput2 
{
                 List<Scan> scans = new ArrayList<Scan>();
                 for (String htable : new 
HBaseMRSteps(seg).getMergingHTables()) {
                     Scan scan = new Scan();
-                    scan.setCaching(500); // 1 is the default in Scan, which 
will be bad for MapReduce jobs
+                    scan.setCaching(512); // 1 is the default in Scan, which 
will be bad for MapReduce jobs
                     scan.setCacheBlocks(false); // don't set to true for MR 
jobs
                     scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, 
Bytes.toBytes(htable));
                     scans.add(scan);

http://git-wip-us.apache.org/repos/asf/kylin/blob/114e0ed0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java
index 9cd6d23..038e954 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java
@@ -45,7 +45,7 @@ public class RowCounterCLI {
         Configuration conf = HBaseConfiguration.create();
 
         Scan scan = new Scan();
-        scan.setCaching(1024);
+        scan.setCaching(512);
         scan.setCacheBlocks(true);
         scan.setStartRow(startKey);
         scan.setStopRow(endKey);

Reply via email to