Repository: incubator-kylin Updated Branches: refs/heads/KYLIN-1126 8796fa37f -> 6476549c9
KYLIN-1137 TopN measure need support dictionary merge Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/6476549c Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/6476549c Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/6476549c Branch: refs/heads/KYLIN-1126 Commit: 6476549c9690da8eeefa4fe6c0ee476c3d9bd561 Parents: 8796fa3 Author: shaofengshi <shaofeng...@apache.org> Authored: Wed Nov 11 19:58:02 2015 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Wed Nov 11 19:58:02 2015 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/common/topn/Counter.java | 4 + .../org/apache/kylin/cube/model/CubeDesc.java | 10 +++ .../engine/mr/steps/InMemCuboidMapper.java | 26 ++----- .../engine/mr/steps/MergeCuboidMapper.java | 79 +++++++++++++++++++- .../engine/mr/steps/MergeDictionaryStep.java | 17 ++--- 5 files changed, 106 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6476549c/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java index 2bca4df..461e083 100644 --- a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java +++ b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java @@ -50,6 +50,10 @@ public class Counter<T> implements Externalizable { public T getItem() { return item; } + + public void setItem(T item) { + this.item = item; + } public double getCount() { return count; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6476549c/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 160090a..2709247 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -847,4 +847,14 @@ public class CubeDesc extends RootPersistentEntity { public LinkedHashSet<TblColRef> getMeasureDisplayColumns() { return measureDisplayColumns; } + + + public boolean hasMeasureUsingDictionary() { + for (MeasureDesc measureDesc : this.getMeasures()) { + if (measureDesc.getFunction().isTopN()) + return true; + } + + return false; + } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6476549c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java index 2bf627b..d724c76 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java @@ -65,26 +65,14 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap(); - for (DimensionDesc dim : cubeDesc.getDimensions()) { - // dictionary - for (TblColRef col : dim.getColumnRefs()) { - if (cubeDesc.getRowkey().isUseDictionary(col)) { - Dictionary<?> dict = cubeSegment.getDictionary(col); - if (dict == null) { - logger.warn("Dictionary for " + col + " was not found."); - } - - dictionaryMap.put(col, cubeSegment.getDictionary(col)); - } - } - } - - for (MeasureDesc measureDesc : cubeDesc.getMeasures()) { - if (measureDesc.getFunction().isTopN()) { - List<TblColRef> colRefs = measureDesc.getFunction().getParameter().getColRefs(); - TblColRef col = colRefs.get(colRefs.size() - 1); - dictionaryMap.put(col, cubeSegment.getDictionary(col)); + // dictionary + for (TblColRef col : cubeDesc.getAllColumnsNeedDictionary()) { + Dictionary<?> dict = cubeSegment.getDictionary(col); + if (dict == null) { + logger.warn("Dictionary for " + col + " was not found."); } + + dictionaryMap.put(col, cubeSegment.getDictionary(col)); } DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6476549c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java index 0b68e59..b3b38dc 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java @@ -19,13 +19,18 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.collect.Lists; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.topn.Counter; +import org.apache.kylin.common.topn.TopNCounter; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.SplittedBytes; @@ -43,6 +48,9 @@ import org.apache.kylin.dict.DictionaryManager; import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.metadata.measure.MeasureAggregators; +import org.apache.kylin.metadata.measure.MeasureCodec; +import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; @@ -68,6 +76,12 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { private RowKeyEncoderProvider rowKeyEncoderProvider; private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>(); + private List<MeasureDesc> measuresDescs; + private MeasureCodec codec; + private Object[] measureObjs; + private Integer[] measureIdxUsingDict; + private ByteBuffer valueBuf; + private Text outputValue; private Boolean checkNeedMerging(TblColRef col) throws IOException { Boolean ret = dictsNeedMerging.get(col); @@ -108,6 +122,22 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255); rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment); + + if (cubeDesc.hasMeasureUsingDictionary()) { + measuresDescs = cubeDesc.getMeasures(); + codec = new MeasureCodec(measuresDescs); + measureObjs = new Object[measuresDescs.size()]; + List<Integer> measuresUsingDict = Lists.newArrayList(); + for (int i = 0; i < measuresDescs.size(); i++) { + if (measuresDescs.get(i).getFunction().isTopN()) { + // so far only TopN uses dic + measuresUsingDict.add(i); + } + } + measureIdxUsingDict = measuresUsingDict.toArray(new Integer[measuresUsingDict.size()]); + valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); + outputValue = new Text(); + } } private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"); @@ -200,6 +230,53 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf); outputKey.set(newKeyBuf.array(), 0, fullKeySize); - context.write(outputKey, value); + + // encode measure if it uses dictionary + if (cubeDesc.hasMeasureUsingDictionary()) { + codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs); + bufOffset = 0; + for (int idx : measureIdxUsingDict) { + // only TopN measure uses dic + TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx]; + + MeasureDesc measureDesc = measuresDescs.get(idx); + String displayCol = measureDesc.getFunction().getParameter().getDisplayColumn(); + TblColRef col = cubeDesc.findColumnRef(cubeDesc.getFactTable(), displayCol); + DictionaryManager dictMgr = DictionaryManager.getInstance(config); + Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col)); + Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col)); + + int topNSize = topNCounters.size(); + while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // + mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // + mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) { + byte[] oldBuf = newKeyBodyBuf; + newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length]; + System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length); + } + + for (Counter<ByteArray> c : topNCounters) { + int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length()); + int idInMergedDict; + int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset); + if (size < 0) { + idInMergedDict = mergedDict.nullId(); + } else { + idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size); + } + + BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); + c.setItem(new ByteArray(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId())); + bufOffset += mergedDict.getSizeOfId(); + } + } + + valueBuf.clear(); + codec.encode(measureObjs, valueBuf); + outputValue.set(valueBuf.array(), 0, valueBuf.position()); + context.write(outputKey, outputValue); + } else { + context.write(outputKey, value); + } } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6476549c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java index 2db4ce7..b73fda4 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java @@ -110,16 +110,13 @@ public class MergeDictionaryStep extends AbstractExecutable { DictionaryManager dictMgr = DictionaryManager.getInstance(conf); CubeDesc cubeDesc = cube.getDescriptor(); - for (DimensionDesc dim : cubeDesc.getDimensions()) { - for (TblColRef col : dim.getColumnRefs()) { - if (newSeg.getCubeDesc().getRowkey().isUseDictionary(col)) { - String dictTable = dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col).getTable(); - if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) { - colsNeedMeringDict.add(col); - } else { - colsNeedCopyDict.add(col); - } - } + + for (TblColRef col : cubeDesc.getAllColumnsNeedDictionary()) { + String dictTable = dictMgr.decideSourceData(cubeDesc.getModel(), "true", col).getTable(); + if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) { + colsNeedMeringDict.add(col); + } else { + colsNeedCopyDict.add(col); } }