KYLIN-2409 change inmem cubing to single thread by default
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/21969753 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/21969753 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/21969753 Branch: refs/heads/master-hbase1.x Commit: 21969753cc66efa22bc9fc933af46346e9846631 Parents: b49c9e3 Author: shaofengshi <shaofeng...@apache.org> Authored: Thu Jan 19 11:52:17 2017 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Thu Jan 19 17:37:42 2017 +0800 ---------------------------------------------------------------------- .../main/java/org/apache/kylin/common/KylinConfigBase.java | 6 +++++- .../kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java | 2 +- .../java/org/apache/kylin/gridtable/GTAggregateScanner.java | 9 +++++++++ .../org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java | 2 ++ 4 files changed, 17 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/21969753/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index d6774ff..74903d5 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -300,6 +300,10 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.cube.algorithm.inmem-split-limit", "500")); } + public int getCubeAlgorithmInMemConcurrentThreads() { + return Integer.parseInt(getOptional("kylin.cube.algorithm.inmem-concurrent-threads", "1")); + } + public boolean isIgnoreCubeSignatureInconsistency() { return Boolean.parseBoolean(getOptional("kylin.cube.ignore-signature-inconsistency", "false")); } @@ -744,7 +748,7 @@ abstract public class KylinConfigBase implements Serializable { } public int getYarnStatusCheckIntervalSeconds() { - return Integer.parseInt(getOptional("kylin.engine.mr.yarn-check-interval-seconds", "60")); + return Integer.parseInt(getOptional("kylin.engine.mr.yarn-check-interval-seconds", "10")); } // ============================================================================ http://git-wip-us.apache.org/repos/asf/kylin/blob/21969753/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java index 651203a..c7a4a05 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java @@ -46,7 +46,7 @@ abstract public class AbstractInMemCubeBuilder { final protected CubeDesc cubeDesc; final protected Map<TblColRef, Dictionary<String>> dictionaryMap; - protected int taskThreadCount = 4; + protected int taskThreadCount = 1; protected int reserveMemoryMB = 100; public AbstractInMemCubeBuilder(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { http://git-wip-us.apache.org/repos/asf/kylin/blob/21969753/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index 55c04c6..9158aa3 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -36,6 +36,7 @@ import java.util.Map.Entry; import org.apache.commons.io.IOUtils; import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.MemoryBudgetController; import org.apache.kylin.common.util.Pair; @@ -178,11 +179,16 @@ public class GTAggregateScanner implements IGTScanner { final List<Dump> dumps; final int keyLength; final boolean[] compareMask; + boolean compareAll = true; final BufferedMeasureCodec measureCodec; final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() { @Override public int compare(byte[] o1, byte[] o2) { + if (compareAll) { + return Bytes.compareTo(o1, o2); + } + int result = 0; // profiler shows this check is slow // Preconditions.checkArgument(keyLength == o1.length && keyLength == o2.length); @@ -206,6 +212,9 @@ public class GTAggregateScanner implements IGTScanner { public AggregationCache() { compareMask = createCompareMask(); + for (boolean l : compareMask) { + compareAll = compareAll && l; + } keyLength = compareMask.length; dumps = Lists.newArrayList(); aggBufMap = createBuffMap(); http://git-wip-us.apache.org/repos/asf/kylin/blob/21969753/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java index 116d5e0..c0ff2f2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java @@ -94,8 +94,10 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr dictionaryMap.put(col, cubeSegment.getDictionary(col)); } + int taskCount = config.getCubeAlgorithmInMemConcurrentThreads(); DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), flatDesc, dictionaryMap); cubeBuilder.setReserveMemoryMB(calculateReserveMB(context.getConfiguration())); + cubeBuilder.setConcurrentThreads(taskCount); ExecutorService executorService = Executors.newSingleThreadExecutor(); future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));