KYLIN-976 extract MeasureIngester.reEncodeDictionary()
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/dac24880 Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/dac24880 Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/dac24880 Branch: refs/heads/KYLIN-976 Commit: dac248804b331f491175e06fd9f598235b2c9e6f Parents: ce61309 Author: Li, Yang <yang...@ebay.com> Authored: Fri Nov 27 16:59:51 2015 +0800 Committer: Li, Yang <yang...@ebay.com> Committed: Fri Nov 27 16:59:51 2015 +0800 ---------------------------------------------------------------------- .../apache/kylin/measure/MeasureIngester.java | 2 + .../org/apache/kylin/measure/MeasureType.java | 3 - .../kylin/measure/basic/BasicMeasureType.java | 7 -- .../kylin/measure/basic/BigDecimalIngester.java | 6 ++ .../kylin/measure/basic/DoubleIngester.java | 6 ++ .../kylin/measure/basic/LongIngester.java | 6 ++ .../kylin/measure/hllc/HLLCMeasureType.java | 11 ++- .../kylin/measure/topn/TopNMeasureType.java | 43 +++++++++--- .../mr/steps/MergeCuboidFromStorageMapper.java | 68 +++++++------------ .../engine/mr/steps/MergeCuboidMapper.java | 71 +++++++------------- 10 files changed, 107 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java index 8d6e601..9c7b406 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java @@ -41,4 +41,6 @@ abstract public class MeasureIngester<V> { } abstract public V valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap); + + abstract public V reEncodeDictionary(V value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java index 604a7b6..4fe59c0 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java @@ -21,7 +21,6 @@ package org.apache.kylin.measure; import java.util.List; import java.util.Map; -import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.measure.basic.BasicMeasureFactory; import org.apache.kylin.measure.hllc.HLLCAggregationFactory; import org.apache.kylin.measure.topn.TopNMeasureFactory; @@ -86,8 +85,6 @@ abstract public class MeasureType { abstract public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc); - abstract public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts); - /* ============================================================================ * Cube Selection * ---------------------------------------------------------------------------- */ http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java index f6bf090..fe53bab 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java @@ -20,7 +20,6 @@ package org.apache.kylin.measure.basic; import java.util.List; -import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.measure.MeasureIngester; import org.apache.kylin.measure.MeasureType; @@ -126,10 +125,4 @@ public class BasicMeasureType extends MeasureType { return null; } - @Override - public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) { - // TODO Auto-generated method stub - return null; - } - } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java index bb743d6..ea1495c 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java @@ -28,6 +28,7 @@ import org.apache.kylin.metadata.model.TblColRef; public class BigDecimalIngester extends MeasureIngester<BigDecimal> { + @Override public BigDecimal valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { if (values.length > 1) throw new IllegalArgumentException(); @@ -37,4 +38,9 @@ public class BigDecimalIngester extends MeasureIngester<BigDecimal> { else return new BigDecimal(values[0]); } + + @Override + public BigDecimal reEncodeDictionary(BigDecimal value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java index 506ed19..aaa754a 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java @@ -31,6 +31,7 @@ public class DoubleIngester extends MeasureIngester<DoubleMutable> { // avoid repeated object creation private DoubleMutable current = new DoubleMutable(); + @Override public DoubleMutable valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { if (values.length > 1) throw new IllegalArgumentException(); @@ -42,4 +43,9 @@ public class DoubleIngester extends MeasureIngester<DoubleMutable> { l.set(Double.parseDouble(values[0])); return l; } + + @Override + public DoubleMutable reEncodeDictionary(DoubleMutable value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java index 5bf1257..bdc1704 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java @@ -31,6 +31,7 @@ public class LongIngester extends MeasureIngester<LongMutable> { // avoid repeated object creation private LongMutable current = new LongMutable(); + @Override public LongMutable valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { if (values.length > 1) throw new IllegalArgumentException(); @@ -42,4 +43,9 @@ public class LongIngester extends MeasureIngester<LongMutable> { l.set(Long.parseLong(values[0])); return l; } + + @Override + public LongMutable reEncodeDictionary(LongMutable value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java index ee90818..2ad7630 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java @@ -74,6 +74,11 @@ public class HLLCMeasureType extends MeasureType { hllc.add(v == null ? "__nUlL__" : v); return hllc; } + + @Override + public HyperLogLogPlusCounter reEncodeDictionary(HyperLogLogPlusCounter value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict) { + throw new UnsupportedOperationException(); + } }; } @@ -91,10 +96,4 @@ public class HLLCMeasureType extends MeasureType { return null; } - @Override - public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) { - // TODO Auto-generated method stub - return null; - } - } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java index 1d2c87b..561f9f1 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java @@ -18,9 +18,11 @@ package org.apache.kylin.measure.topn; +import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.kylin.common.topn.Counter; import org.apache.kylin.common.topn.TopNCounter; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.BytesUtil; @@ -88,6 +90,37 @@ public class TopNMeasureType extends MeasureType { topNCounter.offer(key, counter); return topNCounter; } + + @SuppressWarnings("unchecked") + @Override + public TopNCounter reEncodeDictionary(TopNCounter value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts) { + TopNCounter<ByteArray> topNCounter = (TopNCounter<ByteArray>) value; + + TblColRef colRef = measureDesc.getFunction().getTopNLiteralColumn(); + Dictionary<String> sourceDict = oldDicts.get(colRef); + Dictionary<String> mergedDict = newDicts.get(colRef); + + int topNSize = topNCounter.size(); + byte[] newIdBuf = new byte[topNSize * mergedDict.getSizeOfId()]; + byte[] literal = new byte[sourceDict.getSizeOfValue()]; + + int bufOffset = 0; + for (Counter<ByteArray> c : topNCounter) { + int oldId = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length()); + int newId; + int size = sourceDict.getValueBytesFromId(oldId, literal, 0); + if (size < 0) { + newId = mergedDict.nullId(); + } else { + newId = mergedDict.getIdFromValueBytes(literal, 0, size); + } + + BytesUtil.writeUnsigned(newId, newIdBuf, bufOffset, mergedDict.getSizeOfId()); + c.getItem().set(newIdBuf, bufOffset, mergedDict.getSizeOfId()); + bufOffset += mergedDict.getSizeOfId(); + } + return value; + } }; } @@ -98,14 +131,8 @@ public class TopNMeasureType extends MeasureType { @Override public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) { - // TODO Auto-generated method stub - return null; - } - - @Override - public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) { - // TODO Auto-generated method stub - return null; + TblColRef literalCol = measureDesc.getFunction().getParameter().getColRefs().get(1); + return Collections.singletonList(literalCol); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java index bc1c883..b4682dd 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java @@ -22,10 +22,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.topn.Counter; -import org.apache.kylin.common.topn.TopNCounter; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.Dictionary; @@ -48,6 +47,8 @@ import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.measure.MeasureCodec; +import org.apache.kylin.measure.MeasureIngester; +import org.apache.kylin.measure.MeasureType; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; @@ -60,6 +61,7 @@ import com.google.common.collect.Lists; /** * @author shaoshi */ +@SuppressWarnings({ "rawtypes", "unchecked" }) public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, ByteArrayWritable, ByteArrayWritable> { private static final Logger logger = LoggerFactory.getLogger(MergeCuboidFromStorageMapper.class); @@ -87,7 +89,9 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By private MeasureCodec codec; private ByteArrayWritable outputValue = new ByteArrayWritable(); - private List<Integer> topNMeasureIdx; + private List<Pair<Integer, MeasureIngester>> dictMeasures; + private Map<TblColRef, Dictionary<String>> oldDicts; + private Map<TblColRef, Dictionary<String>> newDicts; @Override protected void setup(Context context) throws IOException, InterruptedException { @@ -115,12 +119,18 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By measureDescs = cubeDesc.getMeasures(); codec = new MeasureCodec(measureDescs); - topNMeasureIdx = Lists.newArrayList(); + dictMeasures = Lists.newArrayList(); for (int i = 0; i < measureDescs.size(); i++) { - if (measureDescs.get(i).getFunction().isTopN()) { - topNMeasureIdx.add(i); + MeasureDesc measureDesc = measureDescs.get(i); + MeasureType measureType = MeasureType.create(measureDesc.getFunction()); + if (measureType.getColumnsNeedDictionary(measureDesc).isEmpty() == false) { + dictMeasures.add(new Pair<Integer, MeasureIngester>(i, measureType.newIngester())); } } + if (dictMeasures.size() > 0) { + oldDicts = sourceCubeSegment.buildDictionaryMap(); + newDicts = mergedCubeSegment.buildDictionaryMap(); + } } @Override @@ -193,7 +203,9 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By outputKey.set(newKeyBuf.array(), 0, fullKeySize); // re-encode measures if dictionary is used - reEncodeMeasure(value); + if (dictMeasures.size() > 0) { + reEncodeMeasure(value); + } valueBuf.clear(); codec.encode(value, valueBuf); @@ -217,45 +229,11 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By } } - @SuppressWarnings("unchecked") private void reEncodeMeasure(Object[] measureObjs) throws IOException, InterruptedException { - // currently only topN uses dictionary in measure obj - if (topNMeasureIdx.isEmpty()) - return; - - int bufOffset = 0; - for (int idx : topNMeasureIdx) { - TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx]; - - MeasureDesc measureDesc = measureDescs.get(idx); - TblColRef colRef = measureDesc.getFunction().getTopNLiteralColumn(); - DictionaryManager dictMgr = DictionaryManager.getInstance(config); - Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef)); - Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef)); - - int topNSize = topNCounters.size(); - while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // - mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // - mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) { - byte[] oldBuf = newKeyBodyBuf; - newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length]; - System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length); - } - - for (Counter<ByteArray> c : topNCounters) { - int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length()); - int idInMergedDict; - int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset); - if (size < 0) { - idInMergedDict = mergedDict.nullId(); - } else { - idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size); - } - - BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); - c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); - bufOffset += mergedDict.getSizeOfId(); - } + for (Pair<Integer, MeasureIngester> pair : dictMeasures) { + int i = pair.getFirst(); + MeasureIngester ingester = pair.getSecond(); + measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts); } } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java index c0277b5..4fc7236 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java @@ -22,17 +22,17 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.topn.Counter; -import org.apache.kylin.common.topn.TopNCounter; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.SplittedBytes; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -48,6 +48,8 @@ import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.measure.MeasureCodec; +import org.apache.kylin.measure.MeasureIngester; +import org.apache.kylin.measure.MeasureType; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; @@ -57,6 +59,7 @@ import com.google.common.collect.Lists; /** * @author ysong1, honma */ +@SuppressWarnings({"rawtypes", "unchecked"}) public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { private KylinConfig config; @@ -78,7 +81,9 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { private HashMap<TblColRef, Boolean> dimensionsNeedDict = new HashMap<TblColRef, Boolean>(); // for re-encode measures that use dictionary - private List<Integer> topNMeasureIdx; + private List<Pair<Integer, MeasureIngester>> dictMeasures; + private Map<TblColRef, Dictionary<String>> oldDicts; + private Map<TblColRef, Dictionary<String>> newDicts; private List<MeasureDesc> measureDescs; private MeasureCodec codec; private Object[] measureObjs; @@ -115,12 +120,19 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { measureObjs = new Object[measureDescs.size()]; valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); outputValue = new Text(); - topNMeasureIdx = Lists.newArrayList(); + + dictMeasures = Lists.newArrayList(); for (int i = 0; i < measureDescs.size(); i++) { - if (measureDescs.get(i).getFunction().isTopN()) { - topNMeasureIdx.add(i); + MeasureDesc measureDesc = measureDescs.get(i); + MeasureType measureType = MeasureType.create(measureDesc.getFunction()); + if (measureType.getColumnsNeedDictionary(measureDesc).isEmpty() == false) { + dictMeasures.add(new Pair<Integer, MeasureIngester>(i, measureType.newIngester())); } } + if (dictMeasures.size() > 0) { + oldDicts = sourceCubeSegment.buildDictionaryMap(); + newDicts = mergedCubeSegment.buildDictionaryMap(); + } } private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"); @@ -214,9 +226,13 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { outputKey.set(newKeyBuf.array(), 0, fullKeySize); // re-encode measures if dictionary is used - if (topNMeasureIdx.size() > 0) { + if (dictMeasures.size() > 0) { codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs); - reEncodeMeasure(measureObjs); + for (Pair<Integer, MeasureIngester> pair : dictMeasures) { + int i = pair.getFirst(); + MeasureIngester ingester = pair.getSecond(); + measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts); + } valueBuf.clear(); codec.encode(measureObjs, valueBuf); outputValue.set(valueBuf.array(), 0, valueBuf.position()); @@ -240,43 +256,4 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { return ret; } } - - @SuppressWarnings("unchecked") - private void reEncodeMeasure(Object[] measureObjs) throws IOException, InterruptedException { - int bufOffset = 0; - for (int idx : topNMeasureIdx) { - // only TopN measure uses dic - TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx]; - - MeasureDesc measureDesc = measureDescs.get(idx); - TblColRef colRef = measureDesc.getFunction().getTopNLiteralColumn(); - DictionaryManager dictMgr = DictionaryManager.getInstance(config); - Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef)); - Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef)); - - int topNSize = topNCounters.size(); - while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // - mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // - mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) { - byte[] oldBuf = newKeyBodyBuf; - newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length]; - System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length); - } - - for (Counter<ByteArray> c : topNCounters) { - int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length()); - int idInMergedDict; - int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset); - if (size < 0) { - idInMergedDict = mergedDict.nullId(); - } else { - idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size); - } - - BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); - c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); - bufOffset += mergedDict.getSizeOfId(); - } - } - } }