APACHE-KYLIN-2732: Introduce base cuboid as a new input for cubing job
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/386a2ff7 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/386a2ff7 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/386a2ff7 Branch: refs/heads/yaho-cube-planner Commit: 386a2ff7f47faca6a402c1afcfb0ab42c06e59ae Parents: 05ce035 Author: Zhong <[email protected]> Authored: Mon Aug 28 11:34:17 2017 +0800 Committer: Zhong <[email protected]> Committed: Fri Sep 8 11:33:04 2017 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/cube/CubeInstance.java | 2 +- .../cube/cuboid/TreeCuboidSchedulerManager.java | 102 +++++++++ .../inmemcubing/AbstractInMemCubeBuilder.java | 18 +- .../ConsumeBlockingQueueController.java | 84 ++++++++ .../cube/inmemcubing/DoggedCubeBuilder.java | 117 ++-------- .../cube/inmemcubing/InMemCubeBuilder.java | 87 ++------ .../InMemCubeBuilderInputConverter.java | 149 ------------- .../kylin/cube/inmemcubing/InputConverter.java | 69 ++++++ .../cube/inmemcubing/InputConverterUnit.java | 33 +++ .../InputConverterUnitForBaseCuboid.java | 55 +++++ .../InputConverterUnitForRawData.java | 165 ++++++++++++++ .../RecordConsumeBlockingQueueController.java | 91 ++++++++ .../org/apache/kylin/gridtable/GTRecord.java | 5 + .../engine/mr/common/AbstractHadoopJob.java | 12 +- .../kylin/engine/mr/common/BatchConstants.java | 1 + .../engine/mr/steps/InMemCuboidMapper.java | 129 +++-------- .../engine/mr/steps/InMemCuboidMapperBase.java | 214 +++++++++++++++++++ .../apache/kylin/engine/spark/SparkCubing.java | 164 +++++++------- .../ITDoggedCubeBuilderStressTest.java | 3 +- .../inmemcubing/ITDoggedCubeBuilderTest.java | 3 +- .../inmemcubing/ITInMemCubeBuilderTest.java | 14 +- 21 files changed, 1008 insertions(+), 509 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/386a2ff7/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java index d217dcd..05fb5be 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java @@ -330,7 +330,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, } public Set<Long> getCuboidsByMode(String cuboidModeName) { - return getCuboidsByMode(CuboidModeEnum.getByModeName(cuboidModeName)); + return getCuboidsByMode(cuboidModeName == null ? null : CuboidModeEnum.getByModeName(cuboidModeName)); } public Set<Long> getCuboidsByMode(CuboidModeEnum cuboidMode) { http://git-wip-us.apache.org/repos/asf/kylin/blob/386a2ff7/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java new file mode 100644 index 0000000..5e8d965 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube.cuboid; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.metadata.cachesync.Broadcaster; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class TreeCuboidSchedulerManager { + private static ConcurrentMap<String, TreeCuboidScheduler> cache = Maps.newConcurrentMap(); + + private class TreeCuboidSchedulerSyncListener extends Broadcaster.Listener { + @Override + public void onClearAll(Broadcaster broadcaster) throws IOException { + cache.clear(); + } + + @Override + public void onEntityChange(Broadcaster broadcaster, String entity, Broadcaster.Event event, String cacheKey) + throws IOException { + cache.remove(cacheKey); + } + } + + public TreeCuboidSchedulerManager() { + Broadcaster.getInstance(KylinConfig.getInstanceFromEnv()) + .registerListener(new TreeCuboidSchedulerSyncListener(), "cube"); + } + + private static TreeCuboidSchedulerManager instance = new TreeCuboidSchedulerManager(); + + public static TreeCuboidSchedulerManager getInstance() { + return instance; + } + + /** + * + * @param cubeName + * @return null if the cube has no pre-built cuboids + */ + public static TreeCuboidScheduler getTreeCuboidScheduler(String cubeName) { + TreeCuboidScheduler result = cache.get(cubeName); + if (result == null) { + CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cubeInstance = cubeManager.getCube(cubeName); + if (cubeInstance == null) { + return null; + } + TreeCuboidScheduler treeCuboidScheduler = getTreeCuboidScheduler(cubeInstance.getDescriptor(), + cubeManager.getCube(cubeName).getCuboids()); + if (treeCuboidScheduler == null) { + return null; + } + cache.put(cubeName, treeCuboidScheduler); + result = treeCuboidScheduler; + } + return result; + } + + public static TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, Map<Long, Long> cuboidsWithRowCnt) { + if (cuboidsWithRowCnt == null || cuboidsWithRowCnt.isEmpty()) { + return null; + } + return getTreeCuboidScheduler(cubeDesc, Lists.newArrayList(cuboidsWithRowCnt.keySet()), cuboidsWithRowCnt); + } + + public static TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, List<Long> cuboidIds, + Map<Long, Long> cuboidsWithRowCnt) { + if (cuboidIds == null || cuboidsWithRowCnt == null) { + return null; + } + TreeCuboidScheduler treeCuboidScheduler = new TreeCuboidScheduler(cubeDesc, cuboidIds, + new TreeCuboidScheduler.CuboidCostComparator(cuboidsWithRowCnt)); + return treeCuboidScheduler; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/386a2ff7/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java index ae38261..df1fa7a 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java @@ -19,7 +19,6 @@ package org.apache.kylin.cube.inmemcubing; import java.io.IOException; -import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -51,11 +50,6 @@ abstract public class AbstractInMemCubeBuilder { protected int taskThreadCount = 1; protected int reserveMemoryMB = 100; - // @Deprecated - // public AbstractInMemCubeBuilder(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { - // this(cubeDesc.getInitialCuboidScheduler(), cubeDesc, flatDesc, dictionaryMap); - // } - protected AbstractInMemCubeBuilder(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { if (cuboidScheduler == null) @@ -83,12 +77,17 @@ abstract public class AbstractInMemCubeBuilder { return this.reserveMemoryMB; } - public Runnable buildAsRunnable(final BlockingQueue<List<String>> input, final ICuboidWriter output) { + public Runnable buildAsRunnable(final BlockingQueue<String[]> input, final ICuboidWriter output) { + return buildAsRunnable(input, new InputConverterUnitForRawData(cubeDesc, flatDesc, dictionaryMap), output); + } + + public <T> Runnable buildAsRunnable(final BlockingQueue<T> input, final InputConverterUnit<T> inputConverterUnit, + final ICuboidWriter output) { return new Runnable() { @Override public void run() { try { - build(input, output); + build(input, inputConverterUnit, output); } catch (IOException e) { throw new RuntimeException(e); } @@ -96,7 +95,8 @@ abstract public class AbstractInMemCubeBuilder { }; } - abstract public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException; + abstract public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, + ICuboidWriter output) throws IOException; protected void outputCuboid(long cuboidId, GridTable gridTable, ICuboidWriter output) throws IOException { long startTime = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/kylin/blob/386a2ff7/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java new file mode 100644 index 0000000..a9e55f7 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube.inmemcubing; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.collect.Lists; + +public class ConsumeBlockingQueueController<T> implements Iterator<T> { + public final static int DEFAULT_BATCH_SIZE = 1000; + + private volatile boolean hasException = false; + private final BlockingQueue<T> input; + private final int batchSize; + private final List<T> batchBuffer; + private Iterator<T> internalIT; + + private AtomicInteger outputRowCount = new AtomicInteger(); + + public ConsumeBlockingQueueController(BlockingQueue<T> input, int batchSize) { + this.input = input; + this.batchSize = batchSize; + this.batchBuffer = Lists.newArrayListWithExpectedSize(batchSize); + this.internalIT = batchBuffer.iterator(); + } + + @Override + public boolean hasNext() { + if (hasException) { + return false; + } + if (internalIT.hasNext()) { + return true; + } else { + batchBuffer.clear(); + try { + batchBuffer.add(input.take()); + outputRowCount.incrementAndGet(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + outputRowCount.addAndGet(input.drainTo(batchBuffer, batchSize - 1)); + internalIT = batchBuffer.iterator(); + } + return true; + } + + @Override + public T next() { + return internalIT.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + public void findException() { + hasException = true; + } + + public long getOutputRowCount() { + return outputRowCount.get(); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/386a2ff7/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java index dd92a2b..ccd7137 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java @@ -20,21 +20,17 @@ package org.apache.kylin.cube.inmemcubing; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.PriorityQueue; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.TimeUnit; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.ImmutableBitSet; -import org.apache.kylin.common.util.MemoryBudgetController; import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequestBuilder; @@ -55,7 +51,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder { private static Logger logger = LoggerFactory.getLogger(DoggedCubeBuilder.class); private int splitRowThreshold = Integer.MAX_VALUE; - private int unitRows = 1000; + private int unitRows = ConsumeBlockingQueueController.DEFAULT_BATCH_SIZE; public DoggedCubeBuilder(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { @@ -72,8 +68,9 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder { } @Override - public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException { - new BuildOnce().build(input, output); + public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output) + throws IOException { + new BuildOnce().build(input, inputConverterUnit, output); } private class BuildOnce { @@ -81,7 +78,8 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder { BuildOnce() { } - public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException { + public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output) + throws IOException { final List<SplitThread> splits = new ArrayList<SplitThread>(); final Merger merger = new Merger(); @@ -89,32 +87,23 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder { logger.info("Dogged Cube Build start"); try { - SplitThread last = null; - boolean eof = false; + while (true) { + SplitThread last = new SplitThread(splits.size() + 1, RecordConsumeBlockingQueueController + .getQueueController(inputConverterUnit, input, unitRows)); + splits.add(last); - while (!eof) { + last.start(); + logger.info("Split #" + splits.size() + " kickoff"); - if (last != null && shouldCutSplit(splits)) { - cutSplit(last); - last = null; - } + // Build splits sequentially + last.join(); checkException(splits); - - if (last == null) { - last = new SplitThread(); - splits.add(last); - last.start(); - logger.info("Split #" + splits.size() + " kickoff"); + if (last.inputController.ifEnd()) { + break; } - - eof = feedSomeInput(input, last, unitRows); } - for (SplitThread split : splits) { - split.join(); - } - checkException(splits); logger.info("Dogged Cube Build splits complete, took " + (System.currentTimeMillis() - start) + " ms"); merger.mergeAndOutput(splits, output); @@ -202,81 +191,18 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder { throw new IOException(errors.size() + " exceptions during in-mem cube build, cause set to the first, check log for more", errors.get(0)); } } - - private boolean feedSomeInput(BlockingQueue<List<String>> input, SplitThread split, int n) { - try { - int i = 0; - while (i < n) { - List<String> record = input.take(); - i++; - - while (split.inputQueue.offer(record, 1, TimeUnit.SECONDS) == false) { - if (split.exception != null) - return true; // got some error - } - split.inputRowCount++; - - if (record == null || record.isEmpty()) { - return true; - } - } - return false; - - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - - private void cutSplit(SplitThread last) { - try { - // signal the end of input - while (last.isAlive()) { - if (last.inputQueue.offer(Collections.<String> emptyList())) { - break; - } - Thread.sleep(1000); - } - - // wait cuboid build done - last.join(); - - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - - private boolean shouldCutSplit(List<SplitThread> splits) { - int systemAvailMB = MemoryBudgetController.getSystemAvailMB(); - int nSplit = splits.size(); - long splitRowCount = nSplit == 0 ? 0 : splits.get(nSplit - 1).inputRowCount; - - logger.info(splitRowCount + " records went into split #" + nSplit + "; " + systemAvailMB + " MB left, " + reserveMemoryMB + " MB threshold"); - - if (splitRowCount >= splitRowThreshold) { - logger.info("Split cut due to hitting splitRowThreshold " + splitRowThreshold); - return true; - } - - if (systemAvailMB <= reserveMemoryMB * 1.5) { - logger.info("Split cut due to hitting memory threshold, system avail " + systemAvailMB + " MB <= reserve " + reserveMemoryMB + "*1.5 MB"); - return true; - } - - return false; - } } private class SplitThread extends Thread { - final BlockingQueue<List<String>> inputQueue = new ArrayBlockingQueue<List<String>>(16); + final RecordConsumeBlockingQueueController<?> inputController; final InMemCubeBuilder builder; ConcurrentNavigableMap<Long, CuboidResult> buildResult; - long inputRowCount = 0; RuntimeException exception; - public SplitThread() { + public SplitThread(final int num, final RecordConsumeBlockingQueueController<?> inputController) { + super("SplitThread" + num); + this.inputController = inputController; this.builder = new InMemCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap); this.builder.setConcurrentThreads(taskThreadCount); this.builder.setReserveMemoryMB(reserveMemoryMB); @@ -285,12 +211,13 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder { @Override public void run() { try { - buildResult = builder.build(inputQueue); + buildResult = builder.build(inputController); } catch (Exception e) { if (e instanceof RuntimeException) this.exception = (RuntimeException) e; else this.exception = new RuntimeException(e); + inputController.findException(); } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/386a2ff7/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java index 684c26b..f63b53f 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java @@ -87,6 +87,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { private CuboidResult baseResult; private Object[] totalSumForSanityCheck; private ICuboidCollector resultCollector; + private boolean ifBaseCuboidCollected = true; public InMemCubeBuilder(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { @@ -121,8 +122,10 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { } @Override - public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException { - ConcurrentNavigableMap<Long, CuboidResult> result = build(input); + public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output) + throws IOException { + ConcurrentNavigableMap<Long, CuboidResult> result = build( + RecordConsumeBlockingQueueController.getQueueController(inputConverterUnit, input)); try { for (CuboidResult cuboidResult : result.values()) { outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output); @@ -133,7 +136,11 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { } } - public ConcurrentNavigableMap<Long, CuboidResult> build(BlockingQueue<List<String>> input) throws IOException { + public <T> ConcurrentNavigableMap<Long, CuboidResult> build(RecordConsumeBlockingQueueController<T> input) + throws IOException { + if (input.inputConverterUnit instanceof InputConverterUnitForBaseCuboid) { + ifBaseCuboidCollected = false; + } final ConcurrentNavigableMap<Long, CuboidResult> result = new ConcurrentSkipListMap<Long, CuboidResult>(); build(input, new ICuboidCollector() { @Override @@ -150,7 +157,8 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { void collect(CuboidResult result); } - private void build(BlockingQueue<List<String>> input, ICuboidCollector collector) throws IOException { + private <T> void build(RecordConsumeBlockingQueueController<T> input, ICuboidCollector collector) + throws IOException { long startTime = System.currentTimeMillis(); logger.info("In Mem Cube Build start, " + cubeDesc.getName()); @@ -326,7 +334,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { memBudget = new MemoryBudgetController(budget); } - private CuboidResult createBaseCuboid(BlockingQueue<List<String>> input) throws IOException { + private <T> CuboidResult createBaseCuboid(RecordConsumeBlockingQueueController<T> input) throws IOException { long startTime = System.currentTimeMillis(); logger.info("Calculating base cuboid " + baseCuboidId); @@ -356,10 +364,15 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { int mbEstimateBaseAggrCache = (int) (aggregationScanner.getEstimateSizeOfAggrCache() / MemoryBudgetController.ONE_MB); logger.info("Wild estimate of base aggr cache is " + mbEstimateBaseAggrCache + " MB"); - return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, 0); + return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, 0, ifBaseCuboidCollected); } private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) { + return updateCuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB, true); + } + + private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB, + boolean ifCollect) { if (aggrCacheMB <= 0 && baseResult != null) { aggrCacheMB = (int) Math.round(// (DERIVE_AGGR_CACHE_CONSTANT_FACTOR + DERIVE_AGGR_CACHE_VARIABLE_FACTOR * nRows / baseResult.nRows) // @@ -369,7 +382,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { CuboidResult result = new CuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB); taskCuboidCompleted.incrementAndGet(); - resultCollector.collect(result); + if (ifCollect) { + resultCollector.collect(result); + } return result; } @@ -508,62 +523,4 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { return comp < 0 ? -1 : (comp > 0 ? 1 : 0); } } - - // ============================================================================ - - private class InputConverter implements IGTScanner { - GTInfo info; - GTRecord record; - BlockingQueue<List<String>> input; - final InMemCubeBuilderInputConverter inMemCubeBuilderInputConverter; - - public InputConverter(GTInfo info, BlockingQueue<List<String>> input) { - this.info = info; - this.input = input; - this.record = new GTRecord(info); - this.inMemCubeBuilderInputConverter = new InMemCubeBuilderInputConverter(cubeDesc, flatDesc, dictionaryMap, info); - } - - @Override - public Iterator<GTRecord> iterator() { - return new Iterator<GTRecord>() { - - List<String> currentObject = null; - - @Override - public boolean hasNext() { - try { - currentObject = input.take(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - return currentObject != null && currentObject.size() > 0; - } - - @Override - public GTRecord next() { - if (currentObject.size() == 0) - throw new IllegalStateException(); - - inMemCubeBuilderInputConverter.convert(currentObject, record); - return record; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - - @Override - public void close() throws IOException { - } - - @Override - public GTInfo getInfo() { - return info; - } - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/386a2ff7/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java deleted file mode 100644 index 6dd20d8..0000000 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.cube.inmemcubing; - -import java.util.List; -import java.util.Map; - -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; -import org.apache.kylin.gridtable.GTInfo; -import org.apache.kylin.gridtable.GTRecord; -import org.apache.kylin.measure.MeasureIngester; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.ParameterDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -/** - */ -public class InMemCubeBuilderInputConverter { - - @SuppressWarnings("unused") - private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderInputConverter.class); - - public static final byte[] HIVE_NULL = Bytes.toBytes("\\N"); - - private final CubeJoinedFlatTableEnrich flatDesc; - private final MeasureDesc[] measureDescs; - private final MeasureIngester<?>[] measureIngesters; - private final int measureCount; - private final Map<TblColRef, Dictionary<String>> dictionaryMap; - private final GTInfo gtInfo; - protected List<byte[]> nullBytes; - - public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap, GTInfo gtInfo) { - this.gtInfo = gtInfo; - this.flatDesc = new CubeJoinedFlatTableEnrich(flatDesc, cubeDesc); - this.measureCount = cubeDesc.getMeasures().size(); - this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]); - this.measureIngesters = MeasureIngester.create(cubeDesc.getMeasures()); - this.dictionaryMap = dictionaryMap; - initNullBytes(cubeDesc); - } - - public final GTRecord convert(List<String> row) { - final GTRecord record = new GTRecord(gtInfo); - convert(row, record); - return record; - } - - public final void convert(List<String> row, GTRecord record) { - Object[] dimensions = buildKey(row); - Object[] metricsValues = buildValue(row); - Object[] recordValues = new Object[dimensions.length + metricsValues.length]; - System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length); - System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length); - record.setValues(recordValues); - } - - private Object[] buildKey(List<String> row) { - int keySize = flatDesc.getRowKeyColumnIndexes().length; - Object[] key = new Object[keySize]; - - for (int i = 0; i < keySize; i++) { - key[i] = row.get(flatDesc.getRowKeyColumnIndexes()[i]); - if (key[i] != null && isNull(Bytes.toBytes((String) key[i]))) { - key[i] = null; - } - } - - return key; - } - - private Object[] buildValue(List<String> row) { - Object[] values = new Object[measureCount]; - for (int i = 0; i < measureCount; i++) { - values[i] = buildValueOf(i, row); - } - return values; - } - - private Object buildValueOf(int idxOfMeasure, List<String> row) { - MeasureDesc measure = measureDescs[idxOfMeasure]; - FunctionDesc function = measure.getFunction(); - int[] colIdxOnFlatTable = flatDesc.getMeasureColumnIndexes()[idxOfMeasure]; - - int paramCount = function.getParameterCount(); - String[] inputToMeasure = new String[paramCount]; - - // pick up parameter values - ParameterDesc param = function.getParameter(); - int paramColIdx = 0; // index among parameters of column type - for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) { - String value; - if (function.isCount()) { - value = "1"; - } else if (param.isColumnType()) { - value = row.get(colIdxOnFlatTable[paramColIdx++]); - } else { - value = param.getValue(); - } - inputToMeasure[i] = value; - } - - return measureIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap); - } - - private void initNullBytes(CubeDesc cubeDesc) { - nullBytes = Lists.newArrayList(); - nullBytes.add(HIVE_NULL); - String[] nullStrings = cubeDesc.getNullStrings(); - if (nullStrings != null) { - for (String s : nullStrings) { - nullBytes.add(Bytes.toBytes(s)); - } - } - } - - private boolean isNull(byte[] v) { - for (byte[] nullByte : nullBytes) { - if (Bytes.equals(v, nullByte)) - return true; - } - return false; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/386a2ff7/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverter.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverter.java new file mode 100644 index 0000000..664f784 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverter.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube.inmemcubing; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.IGTScanner; + +public class InputConverter<T> implements IGTScanner { + private GTInfo info; + private GTRecord record; + private RecordConsumeBlockingQueueController<T> inputController; + + public InputConverter(GTInfo info, RecordConsumeBlockingQueueController<T> inputController) { + this.info = info; + this.inputController = inputController; + this.record = new GTRecord(info); + } + + @Override + public Iterator<GTRecord> iterator() { + return new Iterator<GTRecord>() { + + @Override + public boolean hasNext() { + return inputController.hasNext(); + } + + @Override + public GTRecord next() { + inputController.inputConverterUnit.convert(inputController.next(), record); + return record; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public void close() throws IOException { + } + + @Override + public GTInfo getInfo() { + return info; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/386a2ff7/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java new file mode 100644 index 0000000..fe32937 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube.inmemcubing; + +import org.apache.kylin.gridtable.GTRecord; + +public interface InputConverterUnit<T> { + public void convert(T currentObject, GTRecord record); + + public boolean ifEnd(T currentObject); + + public boolean ifCut(T currentObject); + + public T getEmptyUnit(); + + public T getCutUnit(); +} http://git-wip-us.apache.org/repos/asf/kylin/blob/386a2ff7/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java new file mode 100644 index 0000000..1bf02ed --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube.inmemcubing; + +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.gridtable.GTRecord; + +public class InputConverterUnitForBaseCuboid implements InputConverterUnit<ByteArray> { + + public static final ByteArray EMPTY_ROW = ByteArray.EMPTY; + public static final ByteArray CUT_ROW = new ByteArray(0); + + public void convert(ByteArray currentObject, GTRecord record) { + record.loadColumns(currentObject.asBuffer()); + } + + public boolean ifEnd(ByteArray currentObject) { + if (currentObject == null) { + return false; + } + return currentObject == EMPTY_ROW; + } + + public ByteArray getEmptyUnit() { + return EMPTY_ROW; + } + + public ByteArray getCutUnit() { + return CUT_ROW; + } + + @Override + public boolean ifCut(ByteArray currentObject) { + if (currentObject == null) { + return false; + } + return currentObject == CUT_ROW; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/386a2ff7/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java new file mode 100644 index 0000000..465cb3b --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.cube.inmemcubing; + +import java.util.List; +import java.util.Map; + +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.measure.MeasureIngester; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.ParameterDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +/** + */ +public class InputConverterUnitForRawData implements InputConverterUnit<String[]> { + + @SuppressWarnings("unused") + private static final Logger logger = LoggerFactory.getLogger(InputConverterUnitForRawData.class); + + public static final byte[] HIVE_NULL = Bytes.toBytes("\\N"); + public static final String[] EMPTY_ROW = new String[0]; + public static final String[] CUT_ROW = { "" }; + + private final CubeJoinedFlatTableEnrich flatDesc; + private final MeasureDesc[] measureDescs; + private final MeasureIngester<?>[] measureIngesters; + private final int measureCount; + private final Map<TblColRef, Dictionary<String>> dictionaryMap; + protected List<byte[]> nullBytes; + + public InputConverterUnitForRawData(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, + Map<TblColRef, Dictionary<String>> dictionaryMap) { + this.flatDesc = new CubeJoinedFlatTableEnrich(flatDesc, cubeDesc); + this.measureCount = cubeDesc.getMeasures().size(); + this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]); + this.measureIngesters = MeasureIngester.create(cubeDesc.getMeasures()); + this.dictionaryMap = dictionaryMap; + initNullBytes(cubeDesc); + } + + public final void convert(String[] row, GTRecord record) { + Object[] dimensions = buildKey(row); + Object[] metricsValues = buildValue(row); + Object[] recordValues = new Object[dimensions.length + metricsValues.length]; + System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length); + System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length); + record.setValues(recordValues); + } + + public boolean ifEnd(String[] currentObject) { + if (currentObject == null) { + return false; + } + return currentObject == EMPTY_ROW; + } + + public boolean ifCut(String[] currentObject) { + if (currentObject == null) { + return false; + } + return currentObject == CUT_ROW; + } + + public String[] getEmptyUnit() { + return EMPTY_ROW; + } + + public String[] getCutUnit() { + return CUT_ROW; + } + + private Object[] buildKey(String[] row) { + int keySize = flatDesc.getRowKeyColumnIndexes().length; + Object[] key = new Object[keySize]; + + for (int i = 0; i < keySize; i++) { + key[i] = row[flatDesc.getRowKeyColumnIndexes()[i]]; + if (key[i] != null && isNull(Bytes.toBytes((String) key[i]))) { + key[i] = null; + } + } + + return key; + } + + private Object[] buildValue(String[] row) { + Object[] values = new Object[measureCount]; + for (int i = 0; i < measureCount; i++) { + values[i] = buildValueOf(i, row); + } + return values; + } + + private Object buildValueOf(int idxOfMeasure, String[] row) { + MeasureDesc measure = measureDescs[idxOfMeasure]; + FunctionDesc function = measure.getFunction(); + int[] colIdxOnFlatTable = flatDesc.getMeasureColumnIndexes()[idxOfMeasure]; + + int paramCount = function.getParameterCount(); + String[] inputToMeasure = new String[paramCount]; + + // pick up parameter values + ParameterDesc param = function.getParameter(); + int paramColIdx = 0; // index among parameters of column type + for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) { + String value; + if (function.isCount()) { + value = "1"; + } else if (param.isColumnType()) { + value = row[colIdxOnFlatTable[paramColIdx++]]; + } else { + value = param.getValue(); + } + inputToMeasure[i] = value; + } + + return measureIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap); + } + + private void initNullBytes(CubeDesc cubeDesc) { + nullBytes = Lists.newArrayList(); + nullBytes.add(HIVE_NULL); + String[] nullStrings = cubeDesc.getNullStrings(); + if (nullStrings != null) { + for (String s : nullStrings) { + nullBytes.add(Bytes.toBytes(s)); + } + } + } + + private boolean isNull(byte[] v) { + for (byte[] nullByte : nullBytes) { + if (Bytes.equals(v, nullByte)) + return true; + } + return false; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/386a2ff7/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java new file mode 100644 index 0000000..49cbe1f --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube.inmemcubing; + +import java.util.concurrent.BlockingQueue; + +public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueueController<T> { + + public final InputConverterUnit<T> inputConverterUnit; + + private RecordConsumeBlockingQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> input, int batchSize) { + super(input, batchSize); + this.inputConverterUnit = inputConverterUnit; + } + + private T currentObject = null; + private volatile boolean ifEnd = false; + private volatile boolean cut = false; + private long outputRowCountCut = 0L; + + @Override + public boolean hasNext() { + if (currentObject != null) { + return hasNext(currentObject); + } + if (!super.hasNext()) { + return false; + } + currentObject = super.next(); + return hasNext(currentObject); + } + + @Override + public T next() { + if (ifEnd()) + throw new IllegalStateException(); + + T result = currentObject; + currentObject = null; + return result; + } + + public boolean ifEnd() { + return ifEnd; + } + + private boolean hasNext(T object) { + if (inputConverterUnit.ifEnd(object)) { + ifEnd = true; + return false; + }else if(cut){ + return false; + }else if(inputConverterUnit.ifCut(object)){ + return false; + } + return true; + } + + public static <T> RecordConsumeBlockingQueueController<T> getQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> input){ + return new RecordConsumeBlockingQueueController<>(inputConverterUnit, input, DEFAULT_BATCH_SIZE); + } + + public static <T> RecordConsumeBlockingQueueController<T> getQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> input, int batchSize){ + return new RecordConsumeBlockingQueueController<>(inputConverterUnit, input, batchSize); + } + + public void forceCutPipe() { + cut = true; + outputRowCountCut = getOutputRowCount(); + } + + public long getOutputRowCountAfterCut() { + return getOutputRowCount() - outputRowCountCut; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/386a2ff7/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java index 3e62ea7..ab02030 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java @@ -281,6 +281,11 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable { loadColumns(info.colBlocks[c], buf); } + /** change pointers to point to data in given buffer, UNLIKE deserialize */ + public void loadColumns(ByteBuffer buf) { + loadColumns(info.colAll, buf); + } + /** * Change pointers to point to data in given buffer, UNLIKE deserialize * @param selectedCols positions of column to load http://git-wip-us.apache.org/repos/asf/kylin/blob/386a2ff7/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index f9d9808..a608c40 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -460,9 +460,19 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } protected void attachSegmentMetadataWithDict(CubeSegment segment, Configuration conf) throws IOException { + attachSegmentMetadata(segment, conf, true, true); + } + + protected void attachSegmentMetadata(CubeSegment segment, Configuration conf, boolean ifDictIncluded, + boolean ifStatsIncluded) throws IOException { Set<String> dumpList = new LinkedHashSet<>(); dumpList.addAll(collectCubeMetadata(segment.getCubeInstance())); - dumpList.addAll(segment.getDictionaryPaths()); + if (ifDictIncluded) { + dumpList.addAll(segment.getDictionaryPaths()); + } + if (ifStatsIncluded) { + dumpList.add(segment.getStatisticsResourcePath()); + } dumpKylinPropsAndMetadata(dumpList, segment.getConfig(), conf); } http://git-wip-us.apache.org/repos/asf/kylin/blob/386a2ff7/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index 0cb23ac..74a9d09 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -29,6 +29,7 @@ public interface BatchConstants { * ConFiGuration entry names for MR jobs */ + String CFG_CUBOID_MODE = "cuboid.mode"; String CFG_CUBE_NAME = "cube.name"; String CFG_CUBE_SEGMENT_NAME = "cube.segment.name"; String CFG_CUBE_SEGMENT_ID = "cube.segment.id"; http://git-wip-us.apache.org/repos/asf/kylin/blob/386a2ff7/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java index 5511414..30f4619 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java @@ -19,133 +19,60 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import org.apache.hadoop.conf.Configuration; -import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.common.util.MemoryBudgetController; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder; import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.engine.EngineFactory; +import org.apache.kylin.cube.inmemcubing.InputConverterUnit; +import org.apache.kylin.cube.inmemcubing.InputConverterUnitForRawData; import org.apache.kylin.engine.mr.ByteArrayWritable; -import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; -import org.apache.kylin.engine.mr.KylinMapper; +import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.engine.mr.MRUtil; -import org.apache.kylin.engine.mr.common.AbstractHadoopJob; -import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TblColRef; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.google.common.collect.Maps; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; -/** - */ -public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArrayWritable, ByteArrayWritable> { - - private static final Logger logger = LoggerFactory.getLogger(InMemCuboidMapper.class); +public class InMemCuboidMapper<KEYIN> + extends InMemCuboidMapperBase<KEYIN, Object, ByteArrayWritable, ByteArrayWritable, String[]> { - private CubeInstance cube; - private CubeDesc cubeDesc; - private CubeSegment cubeSegment; - private IMRTableInputFormat flatTableInputFormat; - private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(64); - private Future<?> future; + private IMRInput.IMRTableInputFormat flatTableInputFormat; @Override protected void setup(Context context) throws IOException { - super.bindCurrentConfiguration(context.getConfiguration()); + super.setup(context); - Configuration conf = context.getConfiguration(); - - KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); - String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); - cube = CubeManager.getInstance(config).getCube(cubeName); - cubeDesc = cube.getDescriptor(); - String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID); - cubeSegment = cube.getSegmentById(segmentID); flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat(); - IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment); - - Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap(); - - // dictionary - for (TblColRef col : cubeDesc.getAllColumnsHaveDictionary()) { - Dictionary<?> dict = cubeSegment.getDictionary(col); - if (dict == null) { - logger.warn("Dictionary for " + col + " was not found."); - } - - dictionaryMap.put(col, cubeSegment.getDictionary(col)); - } - - int taskCount = config.getCubeAlgorithmInMemConcurrentThreads(); - DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cubeSegment.getCuboidScheduler(), flatDesc, - dictionaryMap); - cubeBuilder.setReserveMemoryMB(calculateReserveMB(context.getConfiguration())); - cubeBuilder.setConcurrentThreads(taskCount); - - ExecutorService executorService = Executors.newSingleThreadExecutor(); - future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment))); - - } - - private int calculateReserveMB(Configuration configuration) { - int sysAvailMB = MemoryBudgetController.getSystemAvailMB(); - int mrReserve = configuration.getInt("mapreduce.task.io.sort.mb", 100); - int sysReserve = Math.max(sysAvailMB / 10, 100); - int reserveMB = mrReserve + sysReserve; - logger.info("Reserve " + reserveMB + " MB = " + mrReserve + " (MR reserve) + " + sysReserve + " (SYS reserve)"); - return reserveMB; } @Override - public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException { - // put each row to the queue - Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record); + protected InputConverterUnit<String[]> getInputConverterUnit() { + Preconditions.checkNotNull(cubeDesc); + Preconditions.checkNotNull(dictionaryMap); + return new InputConverterUnitForRawData(cubeDesc, flatDesc, dictionaryMap); - for(String[] row: rowCollection) { - List<String> rowAsList = Arrays.asList(row); - while (!future.isDone()) { - if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) { - break; - } - } - } } @Override - protected void doCleanup(Context context) throws IOException, InterruptedException { - logger.info("Totally handled " + mapCounter + " records!"); - - while (!future.isDone()) { - if (queue.offer(Collections.<String> emptyList(), 1, TimeUnit.SECONDS)) { - break; - } - } - - try { - future.get(); - } catch (Exception e) { - throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e); - } - queue.clear(); + protected String[] getRecordFromKeyValue(KEYIN key, Object value) { + return flatTableInputFormat.parseMapperInput(value).iterator().next(); } + @Override + protected Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap, + int reserveMemoryMB, CuboidScheduler cuboidScheduler, InputConverterUnit<String[]> inputConverterUnit) { + AbstractInMemCubeBuilder cubeBuilder = new DoggedCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap); + cubeBuilder.setReserveMemoryMB(reserveMemoryMB); + + ExecutorService executorService = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("inmemory-cube-building-mapper-%d").build()); + return executorService.submit(cubeBuilder.buildAsRunnable(queue, inputConverterUnit, + new MapContextGTRecordWriter(context, cubeDesc, cubeSegment))); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/386a2ff7/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java new file mode 100644 index 0000000..0642552 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.common.util.MemoryBudgetController; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.cube.cuboid.DefaultCuboidScheduler; +import org.apache.kylin.cube.cuboid.TreeCuboidSchedulerManager; +import org.apache.kylin.cube.inmemcubing.ConsumeBlockingQueueController; +import org.apache.kylin.cube.inmemcubing.InputConverterUnit; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; +import org.apache.kylin.engine.EngineFactory; +import org.apache.kylin.engine.mr.KylinMapper; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CuboidStatsReaderUtil; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + +/** + */ +public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T> extends KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { + + private static final Logger logger = LoggerFactory.getLogger(InMemCuboidMapperBase.class); + + private int reserveMemoryMB; + private int nSplit = 1; + private int countOfLastSplit = 0; + private int counter = 0; + private int splitRowThreshold = Integer.MAX_VALUE; + private int unitRows = ConsumeBlockingQueueController.DEFAULT_BATCH_SIZE; + + protected CubeInstance cube; + protected CubeDesc cubeDesc; + protected CubeSegment cubeSegment; + protected Map<TblColRef, Dictionary<String>> dictionaryMap; + protected IJoinedFlatTableDesc flatDesc; + + protected BlockingQueue<T> queue = new LinkedBlockingQueue<>(2000); + protected InputConverterUnit<T> inputConverterUnit; + private Future<?> future; + + protected abstract InputConverterUnit<T> getInputConverterUnit(); + + protected abstract Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap, int reserveMemoryMB, // + CuboidScheduler cuboidScheduler, InputConverterUnit<T> inputConverterUnit); + + protected abstract T getRecordFromKeyValue(KEYIN key, VALUEIN value); + + @Override + protected void setup(Context context) throws IOException { + super.bindCurrentConfiguration(context.getConfiguration()); + + Configuration conf = context.getConfiguration(); + + KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); + cube = CubeManager.getInstance(config).getCube(cubeName); + cubeDesc = cube.getDescriptor(); + String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID); + cubeSegment = cube.getSegmentById(segmentID); + flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); + + dictionaryMap = Maps.newHashMap(); + + // dictionary + for (TblColRef col : cubeDesc.getAllColumnsHaveDictionary()) { + Dictionary<?> dict = cubeSegment.getDictionary(col); + if (dict == null) { + logger.warn("Dictionary for " + col + " was not found."); + } + + dictionaryMap.put(col, cubeSegment.getDictionary(col)); + } + + // check memory more often if a single row is big + if (cubeDesc.hasMemoryHungryMeasures()) { + unitRows /= 10; + } + + String cuboidModeName = conf.get(BatchConstants.CFG_CUBOID_MODE); + CuboidScheduler cuboidScheduler = TreeCuboidSchedulerManager.getTreeCuboidScheduler(cubeDesc, // + CuboidStatsReaderUtil.readCuboidStatsFromSegment(cube.getCuboidsByMode(cuboidModeName), cubeSegment)); + if (cuboidScheduler == null) { + cuboidScheduler = new DefaultCuboidScheduler(cubeDesc); + } + + reserveMemoryMB = calculateReserveMB(conf); + inputConverterUnit = getInputConverterUnit(); + future = getCubingThreadFuture(context, dictionaryMap, reserveMemoryMB, cuboidScheduler, inputConverterUnit); + } + + private int calculateReserveMB(Configuration configuration) { + int sysAvailMB = MemoryBudgetController.getSystemAvailMB(); + int mrReserve = configuration.getInt("mapreduce.task.io.sort.mb", 100); + int sysReserve = Math.max(sysAvailMB / 10, 100); + int reserveMB = mrReserve + sysReserve; + logger.info("Reserve " + reserveMB + " MB = " + mrReserve + " (MR reserve) + " + sysReserve + " (SYS reserve)"); + return reserveMB; + } + + @Override + public void doMap(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { + // put each row to the queue + T row = getRecordFromKeyValue(key, value); + + if (offer(context, row, 1, TimeUnit.MINUTES, 60)) { + counter++; + countOfLastSplit++; + if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { + logger.info("Handled " + counter + " records, internal queue size = " + queue.size()); + } + } else { + throw new IOException("Failed to offer row to internal queue due to queue full!"); + } + + if (counter % unitRows == 0 && shouldCutSplit(nSplit, countOfLastSplit)) { + if (offer(context, inputConverterUnit.getCutUnit(), 1, TimeUnit.MINUTES, 60)) { + countOfLastSplit = 0; + } else { + throw new IOException("Failed to offer row to internal queue due to queue full!"); + } + nSplit++; + } + } + + @Override + protected void doCleanup(Context context) throws IOException, InterruptedException { + logger.info("Totally handled " + mapCounter + " records!"); + + while (!future.isDone()) { + if (queue.offer(inputConverterUnit.getEmptyUnit(), 1, TimeUnit.SECONDS)) { + break; + } + } + + futureGet(context); + queue.clear(); + } + + private boolean shouldCutSplit(int nSplit, long splitRowCount) { + int systemAvailMB = MemoryBudgetController.getSystemAvailMB(); + + logger.info(splitRowCount + " records went into split #" + nSplit + "; " + systemAvailMB + " MB left, " + reserveMemoryMB + " MB threshold"); + + if (splitRowCount >= splitRowThreshold) { + logger.info("Split cut due to hitting splitRowThreshold " + splitRowThreshold); + return true; + } + + if (systemAvailMB <= reserveMemoryMB) { + logger.info("Split cut due to hitting memory threshold, system avail " + systemAvailMB + " MB <= reserve " + reserveMemoryMB + " MB"); + return true; + } + + return false; + } + + private boolean offer(Context context, T row, long timeout, TimeUnit unit, int nRound) throws IOException, InterruptedException { + while (nRound > 0) { + if (queue.offer(row, timeout, unit)) { + return true; + } + if (future.isDone()) { + futureGet(context); + throw new IOException("Failed to build cube in mapper due to cubing thread exit unexpectedly"); + } + nRound--; + } + return false; + } + + private void futureGet(Context context) throws IOException { + try { + future.get(); + } catch (Exception e) { + throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/386a2ff7/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index fd53c5a..d7bc172 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -21,8 +21,6 @@ import java.io.File; import java.io.FileFilter; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -62,6 +60,7 @@ import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder; import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder; +import org.apache.kylin.cube.inmemcubing.InputConverterUnitForRawData; import org.apache.kylin.cube.kv.CubeDimEncMap; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; @@ -249,7 +248,8 @@ public class SparkCubing extends AbstractApplication { } } - private Map<Long, HLLCounter> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName, String segmentId) throws Exception { + private Map<Long, HLLCounter> sampling(final JavaRDD<String[]> rowJavaRDD, final String cubeName, String segmentId) + throws Exception { CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName); CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); CubeDesc cubeDesc = cubeInstance.getDescriptor(); @@ -286,52 +286,55 @@ public class SparkCubing extends AbstractApplication { row_hashcodes[i] = new ByteArray(); } - final HashMap<Long, HLLCounter> samplingResult = rowJavaRDD.aggregate(zeroValue, new Function2<HashMap<Long, HLLCounter>, List<String>, HashMap<Long, HLLCounter>>() { + final HashMap<Long, HLLCounter> samplingResult = rowJavaRDD.aggregate(zeroValue, + new Function2<HashMap<Long, HLLCounter>, String[], HashMap<Long, HLLCounter>>() { - final HashFunction hashFunction = Hashing.murmur3_128(); + final HashFunction hashFunction = Hashing.murmur3_128(); - @Override - public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, List<String> v2) throws Exception { - for (int i = 0; i < nRowKey; i++) { - Hasher hc = hashFunction.newHasher(); - String colValue = v2.get(rowKeyColumnIndexes[i]); - if (colValue != null) { - row_hashcodes[i].set(hc.putString(colValue).hash().asBytes()); - } else { - row_hashcodes[i].set(hc.putInt(0).hash().asBytes()); - } - } + @Override + public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, String[] v2) throws Exception { + for (int i = 0; i < nRowKey; i++) { + Hasher hc = hashFunction.newHasher(); + String colValue = v2[rowKeyColumnIndexes[i]]; + if (colValue != null) { + row_hashcodes[i].set(hc.putString(colValue).hash().asBytes()); + } else { + row_hashcodes[i].set(hc.putInt(0).hash().asBytes()); + } + } - for (Map.Entry<Long, Integer[]> entry : allCuboidsBitSet.entrySet()) { - Hasher hc = hashFunction.newHasher(); - HLLCounter counter = v1.get(entry.getKey()); - final Integer[] cuboidBitSet = entry.getValue(); - for (int position = 0; position < cuboidBitSet.length; position++) { - hc.putBytes(row_hashcodes[cuboidBitSet[position]].array()); + for (Map.Entry<Long, Integer[]> entry : allCuboidsBitSet.entrySet()) { + Hasher hc = hashFunction.newHasher(); + HLLCounter counter = v1.get(entry.getKey()); + final Integer[] cuboidBitSet = entry.getValue(); + for (int position = 0; position < cuboidBitSet.length; position++) { + hc.putBytes(row_hashcodes[cuboidBitSet[position]].array()); + } + counter.add(hc.hash().asBytes()); + } + return v1; + } + }, new Function2<HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>>() { + @Override + public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, HashMap<Long, HLLCounter> v2) + throws Exception { + Preconditions.checkArgument(v1.size() == v2.size()); + Preconditions.checkArgument(v1.size() > 0); + for (Map.Entry<Long, HLLCounter> entry : v1.entrySet()) { + final HLLCounter counter1 = entry.getValue(); + final HLLCounter counter2 = v2.get(entry.getKey()); + counter1.merge(Preconditions.checkNotNull(counter2, "counter cannot be null")); + } + return v1; } - counter.add(hc.hash().asBytes()); - } - return v1; - } - }, new Function2<HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>>() { - @Override - public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, HashMap<Long, HLLCounter> v2) throws Exception { - Preconditions.checkArgument(v1.size() == v2.size()); - Preconditions.checkArgument(v1.size() > 0); - for (Map.Entry<Long, HLLCounter> entry : v1.entrySet()) { - final HLLCounter counter1 = entry.getValue(); - final HLLCounter counter2 = v2.get(entry.getKey()); - counter1.merge(Preconditions.checkNotNull(counter2, "counter cannot be null")); - } - return v1; - } - }); + }); return samplingResult; } /** return hfile location */ - private String build(JavaRDD<List<String>> javaRDD, final String cubeName, final String segmentId, final byte[][] splitKeys) throws Exception { + private String build(JavaRDD<String[]> javaRDD, final String cubeName, final String segmentId, + final byte[][] splitKeys) throws Exception { CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); CubeDesc cubeDesc = cubeInstance.getDescriptor(); final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); @@ -365,36 +368,41 @@ public class SparkCubing extends AbstractApplication { } } - final JavaPairRDD<byte[], byte[]> javaPairRDD = javaRDD.glom().mapPartitionsToPair(new PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() { + final JavaPairRDD<byte[], byte[]> javaPairRDD = javaRDD.glom() + .mapPartitionsToPair(new PairFlatMapFunction<Iterator<List<String[]>>, byte[], byte[]>() { - @Override - public Iterator<Tuple2<byte[], byte[]>> call(Iterator<List<List<String>>> listIterator) throws Exception { - long t = System.currentTimeMillis(); - prepare(); + @Override + public Iterator<Tuple2<byte[], byte[]>> call(Iterator<List<String[]>> listIterator) + throws Exception { + long t = System.currentTimeMillis(); + prepare(); - final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); + final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()) + .getCube(cubeName); - LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue(); - System.out.println("load properties finished"); - IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment); + LinkedBlockingQueue<String[]> blockingQueue = new LinkedBlockingQueue(); + System.out.println("load properties finished"); + IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment); AbstractInMemCubeBuilder inMemCubeBuilder = new DoggedCubeBuilder( cubeSegment.getCuboidScheduler(), flatDesc, dictionaryMap); - final SparkCuboidWriter sparkCuboidWriter = new BufferedCuboidWriter(new DefaultTupleConverter(cubeInstance.getSegmentById(segmentId), columnLengthMap)); - Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, sparkCuboidWriter)); - try { - while (listIterator.hasNext()) { - for (List<String> row : listIterator.next()) { - blockingQueue.put(row); + final SparkCuboidWriter sparkCuboidWriter = new BufferedCuboidWriter( + new DefaultTupleConverter(cubeInstance.getSegmentById(segmentId), columnLengthMap)); + Executors.newCachedThreadPool() + .submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, sparkCuboidWriter)); + try { + while (listIterator.hasNext()) { + for (String[] row : listIterator.next()) { + blockingQueue.put(row); + } + } + blockingQueue.put(InputConverterUnitForRawData.EMPTY_ROW); + } catch (Exception e) { + throw new RuntimeException(e); } + System.out.println("build partition cost: " + (System.currentTimeMillis() - t) + "ms"); + return sparkCuboidWriter.getResult().iterator(); } - blockingQueue.put(Collections.<String> emptyList()); - } catch (Exception e) { - throw new RuntimeException(e); - } - System.out.println("build partition cost: " + (System.currentTimeMillis() - t) + "ms"); - return sparkCuboidWriter.getResult().iterator(); - } - }); + }); KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); Configuration conf = getConfigurationForHFile(cubeSegment.getStorageLocationIdentifier()); @@ -562,22 +570,22 @@ public class SparkCubing extends AbstractApplication { setupClasspath(sc, confPath); intermediateTable.cache(); writeDictionary(intermediateTable, cubeName, segmentId); - final JavaRDD<List<String>> rowJavaRDD = intermediateTable.javaRDD().map(new org.apache.spark.api.java.function.Function<Row, List<String>>() { - @Override - public List<String> call(Row v1) throws Exception { - ArrayList<String> result = Lists.newArrayListWithExpectedSize(v1.size()); - for (int i = 0; i < v1.size(); i++) { - final Object o = v1.get(i); - if (o != null) { - result.add(o.toString()); - } else { - result.add(null); + final JavaRDD<String[]> rowJavaRDD = intermediateTable.javaRDD() + .map(new org.apache.spark.api.java.function.Function<Row, String[]>() { + @Override + public String[] call(Row v1) throws Exception { + String[] result = new String[v1.size()]; + for (int i = 0; i < v1.size(); i++) { + final Object o = v1.get(i); + if (o != null) { + result[i] = o.toString(); + } else { + result[i] = null; + } + } + return result; } - } - return result; - - } - }); + }); final Map<Long, HLLCounter> samplingResult = sampling(rowJavaRDD, cubeName, segmentId); final byte[][] splitKeys = createHTable(cubeName, segmentId, samplingResult); http://git-wip-us.apache.org/repos/asf/kylin/blob/386a2ff7/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java index 5e5b16a..fa2d792 100644 --- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java @@ -19,7 +19,6 @@ package org.apache.kylin.cube.inmemcubing; import java.io.IOException; -import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; @@ -79,7 +78,7 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase { @Test public void test() throws Exception { - ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000); + ArrayBlockingQueue<String[]> queue = new ArrayBlockingQueue<String[]>(1000); ExecutorService executorService = Executors.newSingleThreadExecutor(); long randSeed = System.currentTimeMillis();
