Repository: incubator-kylin Updated Branches: refs/heads/KYLIN-1126 0cead28c3 -> 3d0334fe7
KYLIN-1137 add topN support in MergeCuboidFromStorageMapper for MR_V2 Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/3d0334fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/3d0334fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/3d0334fe Branch: refs/heads/KYLIN-1126 Commit: 3d0334fe73cf922e5365007c295cd118d3f9b0e0 Parents: 0cead28 Author: shaofengshi <shaofeng...@apache.org> Authored: Fri Nov 13 16:25:34 2015 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Fri Nov 13 16:25:34 2015 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/common/topn/Counter.java | 4 - .../mr/steps/MergeCuboidFromStorageMapper.java | 69 +++++++++++++++ .../engine/mr/steps/MergeCuboidMapper.java | 89 +++++++++++--------- 3 files changed, 116 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d0334fe/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java index 461e083..31c5ed1 100644 --- a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java +++ b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java @@ -51,10 +51,6 @@ public class Counter<T> implements Externalizable { return item; } - public void setItem(T item) { - this.item = item; - } - public double getCount() { return count; } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d0334fe/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java index 50f3d4c..286ff02 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java @@ -21,8 +21,14 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.List; +import com.google.common.collect.Lists; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.topn.Counter; +import org.apache.kylin.common.topn.TopNCounter; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.Pair; @@ -45,6 +51,8 @@ import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.metadata.measure.MeasureCodec; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; @@ -81,6 +89,9 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By private MeasureCodec codec; private ByteArrayWritable outputValue = new ByteArrayWritable(); + private List<MeasureDesc> measuresDescs; + private Integer[] measureIdxUsingDict; + private Boolean checkNeedMerging(TblColRef col) throws IOException { Boolean ret = dictsNeedMerging.get(col); if (ret != null) @@ -119,7 +130,18 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255); rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment); + measuresDescs = cubeDesc.getMeasures(); codec = new MeasureCodec(cubeDesc.getMeasures()); + if (cubeDesc.hasMeasureUsingDictionary()) { + List<Integer> measuresUsingDict = Lists.newArrayList(); + for (int i = 0; i < measuresDescs.size(); i++) { + if (measuresDescs.get(i).getFunction().isTopN()) { + // so far only TopN uses dic + measuresUsingDict.add(i); + } + } + measureIdxUsingDict = measuresUsingDict.toArray(new Integer[measuresUsingDict.size()]); + } } @Override @@ -191,6 +213,11 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf); outputKey.set(newKeyBuf.array(), 0, fullKeySize); + // encode measure if it uses dictionary + if (cubeDesc.hasMeasureUsingDictionary()) { + reEncodeMeasure(value); + } + valueBuf.clear(); codec.encode(value, valueBuf); outputValue.set(valueBuf.array(), 0, valueBuf.position()); @@ -198,4 +225,46 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By context.write(outputKey, outputValue); } + private void reEncodeMeasure(Object[] measureObjs) throws IOException, InterruptedException { + int bufOffset = 0; + for (int idx : measureIdxUsingDict) { + // only TopN measure uses dic + TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx]; + + MeasureDesc measureDesc = measuresDescs.get(idx); + String displayCol = measureDesc.getFunction().getParameter().getDisplayColumn().toUpperCase(); + if (StringUtils.isNotEmpty(displayCol)) { + ColumnDesc sourceColumn = cubeDesc.getFactTableDesc().findColumnByName(displayCol); + TblColRef colRef = new TblColRef(sourceColumn); + DictionaryManager dictMgr = DictionaryManager.getInstance(config); + Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef)); + Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef)); + + int topNSize = topNCounters.size(); + while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // + mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // + mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) { + byte[] oldBuf = newKeyBodyBuf; + newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length]; + System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length); + } + + for (Counter<ByteArray> c : topNCounters) { + int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length()); + int idInMergedDict; + int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset); + if (size < 0) { + idInMergedDict = mergedDict.nullId(); + } else { + idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size); + } + + BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); + c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); + bufOffset += mergedDict.getSizeOfId(); + } + } + } + + } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d0334fe/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java index ac46b99..68d1481 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java @@ -234,52 +234,57 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { // encode measure if it uses dictionary if (cubeDesc.hasMeasureUsingDictionary()) { codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs); - bufOffset = 0; - for (int idx : measureIdxUsingDict) { - // only TopN measure uses dic - TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx]; - - MeasureDesc measureDesc = measuresDescs.get(idx); - String displayCol = measureDesc.getFunction().getParameter().getDisplayColumn().toUpperCase(); - if (StringUtils.isNotEmpty(displayCol)) { - ColumnDesc sourceColumn = cubeDesc.getFactTableDesc().findColumnByName(displayCol); - TblColRef colRef = new TblColRef(sourceColumn); - DictionaryManager dictMgr = DictionaryManager.getInstance(config); - Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef)); - Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef)); - - int topNSize = topNCounters.size(); - while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // - mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // - mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) { - byte[] oldBuf = newKeyBodyBuf; - newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length]; - System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length); - } + reEncodeMeasure(measureObjs); + valueBuf.clear(); + codec.encode(measureObjs, valueBuf); + outputValue.set(valueBuf.array(), 0, valueBuf.position()); + value = outputValue; + } + + context.write(outputKey, value); + } + + private void reEncodeMeasure(Object[] measureObjs) throws IOException, InterruptedException { + int bufOffset = 0; + for (int idx : measureIdxUsingDict) { + // only TopN measure uses dic + TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx]; + + MeasureDesc measureDesc = measuresDescs.get(idx); + String displayCol = measureDesc.getFunction().getParameter().getDisplayColumn().toUpperCase(); + if (StringUtils.isNotEmpty(displayCol)) { + ColumnDesc sourceColumn = cubeDesc.getFactTableDesc().findColumnByName(displayCol); + TblColRef colRef = new TblColRef(sourceColumn); + DictionaryManager dictMgr = DictionaryManager.getInstance(config); + Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef)); + Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef)); + + int topNSize = topNCounters.size(); + while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // + mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // + mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) { + byte[] oldBuf = newKeyBodyBuf; + newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length]; + System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length); + } - for (Counter<ByteArray> c : topNCounters) { - int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length()); - int idInMergedDict; - int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset); - if (size < 0) { - idInMergedDict = mergedDict.nullId(); - } else { - idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size); - } - - BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); - c.setItem(new ByteArray(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId())); - bufOffset += mergedDict.getSizeOfId(); + for (Counter<ByteArray> c : topNCounters) { + int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length()); + int idInMergedDict; + int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset); + if (size < 0) { + idInMergedDict = mergedDict.nullId(); + } else { + idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size); } + + BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); + c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); + bufOffset += mergedDict.getSizeOfId(); } } - - valueBuf.clear(); - codec.encode(measureObjs, valueBuf); - outputValue.set(valueBuf.array(), 0, valueBuf.position()); - context.write(outputKey, outputValue); - } else { - context.write(outputKey, value); } + } + }