Repository: kylin Updated Branches: refs/heads/2.0-rc d8a964b4b -> d828780ec
KYLIN-1233 Configurable mem usage inside coprocessor Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d828780e Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d828780e Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d828780e Branch: refs/heads/2.0-rc Commit: d828780ec241f8792b7574ca1eaef186f0b485e4 Parents: d8a964b Author: Yang Li <[email protected]> Authored: Sun Dec 27 21:27:39 2015 +0800 Committer: Yang Li <[email protected]> Committed: Sun Dec 27 21:28:46 2015 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 4 +++ .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 4 +-- .../hbase/cube/v2/CubeSegmentScanner.java | 6 +++- .../coprocessor/endpoint/CubeVisitService.java | 5 ++- .../hbase/util/DeployCoprocessorCLI.java | 32 +++++++++++++++++++- 5 files changed, 44 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/d828780e/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index aee1bd8..5968411 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -420,6 +420,10 @@ public class KylinConfigBase implements Serializable { return Long.parseLong(this.getOptional("kylin.query.mem.budget", String.valueOf(3L * 1024 * 1024 * 1024))); } + public double getQueryCoprocessorMemGB() { + return Double.parseDouble(this.getOptional("kylin.query.coprocessor.mem.gb", "3.0")); + } + public boolean isQuerySecureEnabled() { return Boolean.parseBoolean(this.getOptional("kylin.query.security.enabled", "false")); } http://git-wip-us.apache.org/repos/asf/kylin/blob/d828780e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 5bc4a00..9ca3fc8 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -223,11 +223,11 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { future.get(1, TimeUnit.HOURS); } } catch (InterruptedException e) { - throw new RuntimeException("Visiting cube by endpoint gets interrupted"); + throw new RuntimeException("Visiting cube by endpoint gets interrupted", e); } catch (ExecutionException e) { throw new RuntimeException("Visiting cube throw exception", e); } catch (TimeoutException e) { - throw new RuntimeException("Visiting cube by endpoint timeout"); + throw new RuntimeException("Visiting cube by endpoint timeout", e); } return new EndpointResultsAsGTScanner(fullGTInfo, rowBlocks.iterator(), scanRequest.getColumns(), totalScannedCount.get()); http://git-wip-us.apache.org/repos/asf/kylin/blob/d828780e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java index 33a1180..79e33fe 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java @@ -12,6 +12,7 @@ import java.util.NoSuchElementException; import java.util.Set; import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.DateFormat; @@ -84,8 +85,11 @@ public class CubeSegmentScanner implements IGTScanner { trimmedInfoBytes = GTInfo.serialize(info); GTInfo trimmedInfo = GTInfo.deserialize(trimmedInfoBytes); + KylinConfig config = cubeSeg.getCubeInstance().getConfig(); for (GTScanRange range : scanRanges) { - scanRequests.add(new GTScanRequest(trimmedInfo, range.replaceGTInfo(trimmedInfo), gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter, allowPreAggregate)); + GTScanRequest req = new GTScanRequest(trimmedInfo, range.replaceGTInfo(trimmedInfo), gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter, allowPreAggregate); + req.setAggrCacheGB(config.getQueryCoprocessorMemGB()); // limit the memory usage inside coprocessor + scanRequests.add(req); } scanner = new Scanner(); http://git-wip-us.apache.org/repos/asf/kylin/blob/d828780e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 6feed33..6981214 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -148,9 +148,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner); CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior()); - if (behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) { - if (scanReq.getAggrCacheGB() <= 0) - scanReq.setAggrCacheGB(10); // 10 GB threshold, inherit from v1.0 + if (behavior.ordinal() < CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) { + scanReq.setAggrCacheGB(0); // disable mem check if so told } IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScan.hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize()); http://git-wip-us.apache.org/repos/asf/kylin/blob/d828780e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java index a5ae09c..1cdb9e9 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -55,6 +56,8 @@ import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; + /** */ public class DeployCoprocessorCLI { @@ -76,7 +79,15 @@ public class DeployCoprocessorCLI { List<String> tableNames = getHTableNames(kylinConfig); logger.info("Identify tables " + tableNames); - + + if (args.length == 1) { + logger.info("Probe run, existing. Append argument 'all' or specific tables to execute."); + System.exit(0); + } + + tableNames = filterTables(tableNames, Arrays.asList(args).subList(1, args.length)); + logger.info("Will execute tables " + tableNames); + Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, tableNames); logger.info("Old coprocessor jar: " + oldJarPaths); @@ -94,6 +105,25 @@ public class DeployCoprocessorCLI { logger.info("Active coprocessor jar: " + hdfsCoprocessorJar); } + private static List<String> filterTables(List<String> tableNames, List<String> list) { + List<String> result = Lists.newArrayList(); + for (String t : list) { + t = t.trim(); + if (t.endsWith(",")) + t = t.substring(0, t.length() - 1); + + if (t.equals("all")) { + result.addAll(tableNames); + break; + } + + if (tableNames.contains(t)) { + result.add(t); + } + } + return result; + } + public static void deployCoprocessor(HTableDescriptor tableDesc) { try { initHTableCoprocessor(tableDesc);
