Repository: incubator-kylin Updated Branches: refs/heads/KYLIN-1126 03c631095 -> 0cead28c3
KYLIN-1137 fix bug in MergeCuboidMapper Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0cead28c Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0cead28c Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0cead28c Branch: refs/heads/KYLIN-1126 Commit: 0cead28c397accccff1d564b1a48109c35a02e4d Parents: 03c6310 Author: shaofengshi <shaofeng...@apache.org> Authored: Thu Nov 12 16:05:05 2015 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Thu Nov 12 16:05:05 2015 +0800 ---------------------------------------------------------------------- .../kylin/job/BuildCubeWithEngineTest.java | 6 +- .../engine/mr/steps/MergeCuboidMapper.java | 73 ++++++++++---------- 2 files changed, 41 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0cead28c/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java index e1f12da..04cb01d 100644 --- a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java +++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java @@ -62,7 +62,7 @@ public class BuildCubeWithEngineTest { private CubeManager cubeManager; private DefaultScheduler scheduler; protected ExecutableManager jobService; - private static boolean fastBuildMode = true; + private static boolean fastBuildMode = false; private static final Log logger = LogFactory.getLog(BuildCubeWithEngineTest.class); @@ -87,8 +87,8 @@ public class BuildCubeWithEngineTest { ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); String fastModeStr = System.getProperty("fastBuildMode"); - if (fastModeStr != null && fastModeStr.equalsIgnoreCase("false")) { - fastBuildMode = false; + if (fastModeStr != null && fastModeStr.equalsIgnoreCase("true")) { + fastBuildMode = true; logger.info("Will not use fast build mode"); } else { logger.info("Will use fast build mode"); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0cead28c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java index d3d0d7c..ac46b99 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java @@ -18,14 +18,8 @@ package org.apache.kylin.engine.mr.steps; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - import com.google.common.collect.Lists; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.kylin.common.KylinConfig; @@ -48,12 +42,19 @@ import org.apache.kylin.dict.DictionaryManager; import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.metadata.measure.MeasureAggregators; import org.apache.kylin.metadata.measure.MeasureCodec; +import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + /** * @author ysong1, honma */ @@ -122,7 +123,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255); rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment); - + if (cubeDesc.hasMeasureUsingDictionary()) { measuresDescs = cubeDesc.getMeasures(); codec = new MeasureCodec(measuresDescs); @@ -230,7 +231,6 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf); outputKey.set(newKeyBuf.array(), 0, fullKeySize); - // encode measure if it uses dictionary if (cubeDesc.hasMeasureUsingDictionary()) { codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs); @@ -241,33 +241,36 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { MeasureDesc measureDesc = measuresDescs.get(idx); String displayCol = measureDesc.getFunction().getParameter().getDisplayColumn().toUpperCase(); - TblColRef col = cubeDesc.findColumnRef(cubeDesc.getFactTable(), displayCol); - DictionaryManager dictMgr = DictionaryManager.getInstance(config); - Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col)); - Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col)); - - int topNSize = topNCounters.size(); - while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // - mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // - mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) { - byte[] oldBuf = newKeyBodyBuf; - newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length]; - System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length); - } - - for (Counter<ByteArray> c : topNCounters) { - int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length()); - int idInMergedDict; - int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset); - if (size < 0) { - idInMergedDict = mergedDict.nullId(); - } else { - idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size); + if (StringUtils.isNotEmpty(displayCol)) { + ColumnDesc sourceColumn = cubeDesc.getFactTableDesc().findColumnByName(displayCol); + TblColRef colRef = new TblColRef(sourceColumn); + DictionaryManager dictMgr = DictionaryManager.getInstance(config); + Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef)); + Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef)); + + int topNSize = topNCounters.size(); + while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // + mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // + mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) { + byte[] oldBuf = newKeyBodyBuf; + newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length]; + System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length); } - BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); - c.setItem(new ByteArray(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId())); - bufOffset += mergedDict.getSizeOfId(); + for (Counter<ByteArray> c : topNCounters) { + int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length()); + int idInMergedDict; + int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset); + if (size < 0) { + idInMergedDict = mergedDict.nullId(); + } else { + idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size); + } + + BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); + c.setItem(new ByteArray(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId())); + bufOffset += mergedDict.getSizeOfId(); + } } }