Repository: kylin
Updated Branches:
  refs/heads/2.0-rc d8a964b4b -> d828780ec


KYLIN-1233 Configurable mem usage inside coprocessor


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d828780e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d828780e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d828780e

Branch: refs/heads/2.0-rc
Commit: d828780ec241f8792b7574ca1eaef186f0b485e4
Parents: d8a964b
Author: Yang Li <[email protected]>
Authored: Sun Dec 27 21:27:39 2015 +0800
Committer: Yang Li <[email protected]>
Committed: Sun Dec 27 21:28:46 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  4 +++
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |  4 +--
 .../hbase/cube/v2/CubeSegmentScanner.java       |  6 +++-
 .../coprocessor/endpoint/CubeVisitService.java  |  5 ++-
 .../hbase/util/DeployCoprocessorCLI.java        | 32 +++++++++++++++++++-
 5 files changed, 44 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d828780e/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index aee1bd8..5968411 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -420,6 +420,10 @@ public class KylinConfigBase implements Serializable {
         return Long.parseLong(this.getOptional("kylin.query.mem.budget", 
String.valueOf(3L * 1024 * 1024 * 1024)));
     }
 
+    public double getQueryCoprocessorMemGB() {
+        return 
Double.parseDouble(this.getOptional("kylin.query.coprocessor.mem.gb", "3.0"));
+    }
+    
     public boolean isQuerySecureEnabled() {
         return 
Boolean.parseBoolean(this.getOptional("kylin.query.security.enabled", "false"));
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d828780e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 5bc4a00..9ca3fc8 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -223,11 +223,11 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                 future.get(1, TimeUnit.HOURS);
             }
         } catch (InterruptedException e) {
-            throw new RuntimeException("Visiting cube by endpoint gets 
interrupted");
+            throw new RuntimeException("Visiting cube by endpoint gets 
interrupted", e);
         } catch (ExecutionException e) {
             throw new RuntimeException("Visiting cube throw exception", e);
         } catch (TimeoutException e) {
-            throw new RuntimeException("Visiting cube by endpoint timeout");
+            throw new RuntimeException("Visiting cube by endpoint timeout", e);
         }
 
         return new EndpointResultsAsGTScanner(fullGTInfo, 
rowBlocks.iterator(), scanRequest.getColumns(), totalScannedCount.get());

http://git-wip-us.apache.org/repos/asf/kylin/blob/d828780e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
index 33a1180..79e33fe 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
@@ -12,6 +12,7 @@ import java.util.NoSuchElementException;
 import java.util.Set;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.DateFormat;
@@ -84,8 +85,11 @@ public class CubeSegmentScanner implements IGTScanner {
         trimmedInfoBytes = GTInfo.serialize(info);
         GTInfo trimmedInfo = GTInfo.deserialize(trimmedInfoBytes);
 
+        KylinConfig config = cubeSeg.getCubeInstance().getConfig();
         for (GTScanRange range : scanRanges) {
-            scanRequests.add(new GTScanRequest(trimmedInfo, 
range.replaceGTInfo(trimmedInfo), gtDimensions, gtAggrGroups, gtAggrMetrics, 
gtAggrFuncs, gtFilter, allowPreAggregate));
+            GTScanRequest req = new GTScanRequest(trimmedInfo, 
range.replaceGTInfo(trimmedInfo), gtDimensions, gtAggrGroups, gtAggrMetrics, 
gtAggrFuncs, gtFilter, allowPreAggregate);
+            req.setAggrCacheGB(config.getQueryCoprocessorMemGB()); // limit 
the memory usage inside coprocessor
+            scanRequests.add(req);
         }
 
         scanner = new Scanner();

http://git-wip-us.apache.org/repos/asf/kylin/blob/d828780e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 6feed33..6981214 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -148,9 +148,8 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             InnerScannerAsIterator cellListIterator = new 
InnerScannerAsIterator(innerScanner);
 
             CoprocessorBehavior behavior = 
CoprocessorBehavior.valueOf(request.getBehavior());
-            if (behavior.ordinal() >= 
CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
-                if (scanReq.getAggrCacheGB() <= 0)
-                    scanReq.setAggrCacheGB(10); // 10 GB threshold, inherit 
from v1.0
+            if (behavior.ordinal() < 
CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
+                scanReq.setAggrCacheGB(0); // disable mem check if so told
             }
 
             IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, 
hbaseRawScan.hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize());

http://git-wip-us.apache.org/repos/asf/kylin/blob/d828780e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index a5ae09c..1cdb9e9 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -55,6 +56,8 @@ import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 /**
  */
 public class DeployCoprocessorCLI {
@@ -76,7 +79,15 @@ public class DeployCoprocessorCLI {
 
         List<String> tableNames = getHTableNames(kylinConfig);
         logger.info("Identify tables " + tableNames);
-
+        
+        if (args.length == 1) {
+            logger.info("Probe run, existing. Append argument 'all' or 
specific tables to execute.");
+            System.exit(0);
+        }
+        
+        tableNames = filterTables(tableNames, Arrays.asList(args).subList(1, 
args.length));
+        logger.info("Will execute tables " + tableNames);
+        
         Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, 
tableNames);
         logger.info("Old coprocessor jar: " + oldJarPaths);
 
@@ -94,6 +105,25 @@ public class DeployCoprocessorCLI {
         logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
     }
 
+    private static List<String> filterTables(List<String> tableNames, 
List<String> list) {
+        List<String> result = Lists.newArrayList();
+        for (String t : list) {
+            t = t.trim();
+            if (t.endsWith(","))
+                t = t.substring(0, t.length() - 1);
+            
+            if (t.equals("all")) {
+                result.addAll(tableNames);
+                break;
+            }
+            
+            if (tableNames.contains(t)) {
+                result.add(t);
+            }
+        }
+        return result;
+    }
+
     public static void deployCoprocessor(HTableDescriptor tableDesc) {
         try {
             initHTableCoprocessor(tableDesc);

Reply via email to