KYLIN-942 Code review
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e62e0b3e Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e62e0b3e Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e62e0b3e Branch: refs/heads/2.x-staging Commit: e62e0b3ef19622c35b70840d652baf49ce97e1ac Parents: b52c1bb Author: Li, Yang <yang...@ebay.com> Authored: Fri Nov 20 11:24:47 2015 +0800 Committer: Li, Yang <yang...@ebay.com> Committed: Mon Nov 23 17:23:50 2015 +0800 ---------------------------------------------------------------------- .../kylin/cube/CubeCapabilityChecker.java | 10 +- .../cube/inmemcubing/InMemCubeBuilder.java | 15 +- .../InMemCubeBuilderInputConverter.java | 14 +- .../cube/inmemcubing/InMemCubeBuilderUtils.java | 10 +- .../org/apache/kylin/cube/model/CubeDesc.java | 55 +------ .../model/validation/rule/FunctionRule.java | 2 +- .../cube/inmemcubing/InMemCubeBuilderTest.java | 12 +- .../apache/kylin/metadata/model/ColumnDesc.java | 1 + .../kylin/metadata/model/DatabaseDesc.java | 1 + .../kylin/metadata/model/FunctionDesc.java | 58 ++++++-- .../kylin/metadata/model/MeasureDesc.java | 12 ++ .../kylin/metadata/model/ParameterDesc.java | 41 ++--- .../kylin/storage/cache/StorageMockUtils.java | 2 - .../engine/mr/steps/BaseCuboidMapperBase.java | 24 +-- .../mr/steps/MergeCuboidFromStorageMapper.java | 129 ++++++++-------- .../engine/mr/steps/MergeCuboidMapper.java | 144 +++++++++--------- .../localmeta/cube/test_kylin_cube_topn.json | 10 -- .../cube/test_kylin_cube_topn_left_join.json | 10 -- .../cube_desc/test_kylin_cube_topn_desc.json | 148 ------------------ .../test_kylin_cube_topn_left_join_desc.json | 149 ------------------- .../test_kylin_cube_without_slr_desc.json | 11 +- ...t_kylin_cube_without_slr_left_join_desc.json | 11 +- .../kylin/invertedindex/model/IIDesc.java | 14 +- .../invertedindex/model/IIKeyValueCodec.java | 1 - .../kylin/query/relnode/OLAPAggregateRel.java | 2 +- .../storage/hbase/cube/v1/CubeStorageQuery.java | 82 ++++------ .../cube/v1/SerializedHBaseTupleIterator.java | 22 ++- .../storage/hbase/cube/v2/CubeStorageQuery.java | 90 +++++------ .../endpoint/EndpointTupleIterator.java | 1 - .../storage/hbase/steps/RowValueDecoder.java | 12 ++ 30 files changed, 369 insertions(+), 724 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java index 3bb246a..343bf11 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java @@ -132,15 +132,15 @@ public class CubeCapabilityChecker { for (MeasureDesc measure : cubeDesc.getMeasures()) { if (measure.getFunction().isTopN()) { List<TblColRef> cols = measure.getFunction().getParameter().getColRefs(); - TblColRef displayCol = cols.get(cols.size() - 1); - if (digest.groupbyColumns.contains(displayCol)) { - dimensionColumnsCopy.remove(displayCol); + TblColRef literalCol = cols.get(cols.size() - 1); + if (digest.groupbyColumns.contains(literalCol)) { + dimensionColumnsCopy.remove(literalCol); if (isMatchedWithDimensions(dimensionColumnsCopy, cube)) { - if (measure.getFunction().isCompatible(onlyFunction)) { + if (measure.getFunction().isTopNCompatibleSum(onlyFunction)) { return true; } } - dimensionColumnsCopy.add(displayCol); + dimensionColumnsCopy.add(literalCol); } } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java index a179d70..e9d940a 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java @@ -17,7 +17,12 @@ package org.apache.kylin.cube.inmemcubing; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -25,7 +30,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.kylin.common.topn.Counter; import org.apache.kylin.common.topn.TopNCounter; -import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.MemoryBudgetController; import org.apache.kylin.common.util.Pair; @@ -43,8 +47,6 @@ import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.GridTable; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.metadata.measure.DoubleMutable; -import org.apache.kylin.metadata.measure.LongMutable; -import org.apache.kylin.metadata.measure.MeasureCodec; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; @@ -59,14 +61,12 @@ import com.google.common.collect.Lists; public class InMemCubeBuilder extends AbstractInMemCubeBuilder { private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class); - private static final LongMutable ONE = new LongMutable(1l); static final double BASE_CUBOID_CACHE_OVERSIZE_FACTOR = 0.1; private final CuboidScheduler cuboidScheduler; private final long baseCuboidId; private final int totalCuboidCount; private final CubeJoinedFlatTableDesc intermediateTableDesc; - private final MeasureCodec measureCodec; private final String[] metricsAggrFuncs; private final MeasureDesc[] measureDescs; private final int measureCount; @@ -87,7 +87,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); this.totalCuboidCount = cuboidScheduler.getCuboidCount(); this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null); - this.measureCodec = new MeasureCodec(cubeDesc.getMeasures()); this.measureCount = cubeDesc.getMeasures().size(); this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]); @@ -510,7 +509,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { this.input = input; this.record = new GTRecord(info); this.inMemCubeBuilderInputConverter = new InMemCubeBuilderInputConverter(cubeDesc, - InMemCubeBuilderUtils.createTopNDisplayColDictionaryMap(cubeDesc, intermediateTableDesc, dictionaryMap), + InMemCubeBuilderUtils.createTopNLiteralColDictionaryMap(cubeDesc, intermediateTableDesc, dictionaryMap), info); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java index d9099ce..69a9fc9 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java @@ -47,18 +47,18 @@ public class InMemCubeBuilderInputConverter { private final MeasureCodec measureCodec; private final int measureCount; private final ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - private final Map<Integer, Dictionary<String>> topNDisplayColDictMap; + private final Map<Integer, Dictionary<String>> topNLiteralColDictMap; private final GTInfo gtInfo; - public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, Map<Integer, Dictionary<String>> topNDisplayColDictMap, GTInfo gtInfo) { + public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, Map<Integer, Dictionary<String>> topNLiteralColDictMap, GTInfo gtInfo) { this.cubeDesc = cubeDesc; this.gtInfo = gtInfo; this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null); this.measureCount = cubeDesc.getMeasures().size(); this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]); this.measureCodec = new MeasureCodec(cubeDesc.getMeasures()); - this.topNDisplayColDictMap = Preconditions.checkNotNull(topNDisplayColDictMap, "topNDisplayColDictMap cannot be null"); + this.topNLiteralColDictMap = Preconditions.checkNotNull(topNLiteralColDictMap, "topNLiteralColDictMap cannot be null"); } public final GTRecord convert(List<String> row) { @@ -102,13 +102,13 @@ public class InMemCubeBuilderInputConverter { } else if (function.isTopN()) { // encode the key column with dict, and get the counter column; int keyColIndex = flatTableIdx[flatTableIdx.length - 1]; - Dictionary<String> displayColDict = topNDisplayColDictMap.get(keyColIndex); - int keyColEncoded = displayColDict.getIdFromValue(row.get(keyColIndex)); + Dictionary<String> literalColDict = topNLiteralColDictMap.get(keyColIndex); + int keyColEncoded = literalColDict.getIdFromValue(row.get(keyColIndex)); valueBuf.clear(); - valueBuf.putInt(displayColDict.getSizeOfId()); + valueBuf.putInt(literalColDict.getSizeOfId()); valueBuf.putInt(keyColEncoded); if (flatTableIdx.length == 1) { - // only displayCol, use 1.0 as counter + // only literalCol, use 1.0 as counter valueBuf.putDouble(1.0); } else { // get the counter column value http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java index f0ee372..9d819a4 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java @@ -35,19 +35,19 @@ import java.util.Map; */ public final class InMemCubeBuilderUtils { - public static final HashMap<Integer, Dictionary<String>> createTopNDisplayColDictionaryMap(CubeDesc cubeDesc, CubeJoinedFlatTableDesc intermediateTableDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) { + public static final HashMap<Integer, Dictionary<String>> createTopNLiteralColDictionaryMap(CubeDesc cubeDesc, CubeJoinedFlatTableDesc intermediateTableDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) { HashMap<Integer, Dictionary<String>> result = Maps.newHashMap(); for (int measureIdx = 0; measureIdx < cubeDesc.getMeasures().size(); measureIdx++) { MeasureDesc measureDesc = cubeDesc.getMeasures().get(measureIdx); FunctionDesc func = measureDesc.getFunction(); if (func.isTopN()) { int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx]; - int displayColIdx = flatTableIdx[flatTableIdx.length - 1]; - TblColRef displayCol = func.getParameter().getColRefs().get(flatTableIdx.length - 1); + int literalColIdx = flatTableIdx[flatTableIdx.length - 1]; + TblColRef literalCol = func.getTopNLiteralColumn(); @SuppressWarnings("unchecked") - Dictionary<String> dictionary = (Dictionary<String>) dictionaryMap.get(displayCol); + Dictionary<String> dictionary = (Dictionary<String>) dictionaryMap.get(literalCol); //Preconditions.checkNotNull(dictionary);//FIXME disable check since dictionary is null when building empty segment - result.put(displayColIdx, dictionary); + result.put(literalColIdx, dictionary); } } return result; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 2250945..ef563ed 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -50,7 +50,6 @@ import org.apache.kylin.metadata.model.IEngineAware; import org.apache.kylin.metadata.model.IStorageAware; import org.apache.kylin.metadata.model.JoinDesc; import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.ParameterDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; @@ -130,7 +129,6 @@ public class CubeDesc extends RootPersistentEntity { private LinkedHashSet<TblColRef> allColumns = new LinkedHashSet<TblColRef>(); private LinkedHashSet<TblColRef> dimensionColumns = new LinkedHashSet<TblColRef>(); - private LinkedHashSet<TblColRef> measureDisplayColumns = new LinkedHashSet<TblColRef>(); private Map<TblColRef, DeriveInfo> derivedToHostMap = Maps.newHashMap(); private Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedMap = Maps.newHashMap(); @@ -414,7 +412,7 @@ public class CubeDesc extends RootPersistentEntity { } return calculateSignature().equals(getSignature()); } - + public String calculateSignature() { MessageDigest md = null; try { @@ -646,37 +644,12 @@ public class CubeDesc extends RootPersistentEntity { m.setDependentMeasureRef(m.getDependentMeasureRef().toUpperCase()); } - FunctionDesc f = m.getFunction(); - f.setExpression(f.getExpression().toUpperCase()); - f.initReturnDataType(); - - ParameterDesc p = f.getParameter(); - p.normalizeColumnValue(); - - ArrayList<TblColRef> colRefs = Lists.newArrayList(); - if (p.isColumnType()) { - for (String cName : p.getValue().split("\\s*,\\s*")) { - ColumnDesc sourceColumn = factTable.findColumnByName(cName); - TblColRef colRef = new TblColRef(sourceColumn); - colRefs.add(colRef); - allColumns.add(colRef); - } - } - - // for topN - if (StringUtils.isNotEmpty(p.getDisplayColumn())) { - ColumnDesc sourceColumn = factTable.findColumnByName(p.getDisplayColumn()); - TblColRef colRef = new TblColRef(sourceColumn); - colRefs.add(colRef); - measureDisplayColumns.add(colRef); - allColumns.add(colRef); - } - - if (colRefs.isEmpty() == false) - p.setColRefs(colRefs); + FunctionDesc func = m.getFunction(); + func.init(factTable); + allColumns.addAll(func.getParameter().getColRefs()); // verify holistic count distinct as a dependent measure - if (m.getFunction().isHolisticCountDistinct() && StringUtils.isBlank(m.getDependentMeasureRef())) { + if (func.isHolisticCountDistinct() && StringUtils.isBlank(m.getDependentMeasureRef())) { throw new IllegalStateException(m + " is a holistic count distinct but it has no DependentMeasureRef defined!"); } } @@ -844,24 +817,10 @@ public class CubeDesc extends RootPersistentEntity { } } - for (TblColRef colRef : measureDisplayColumns) { - if (!result.contains(colRef)) - result.add(colRef); + for (MeasureDesc measure : measures) { + result.addAll(measure.getColumnsNeedDictionary()); } return result; } - public LinkedHashSet<TblColRef> getMeasureDisplayColumns() { - return measureDisplayColumns; - } - - - public boolean hasMeasureUsingDictionary() { - for (MeasureDesc measureDesc : this.getMeasures()) { - if (measureDesc.getFunction().isTopN()) - return true; - } - - return false; - } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java index 80bd2f7..1920fc7 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java @@ -91,7 +91,7 @@ public class FunctionRule implements IValidatorRule<CubeDesc> { if (StringUtils.equalsIgnoreCase(FunctionDesc.PARAMETER_TYPE_COLUMN, type)) { validateColumnParameter(context, cube, value); - } else if (StringUtils.equals(FunctionDesc.PARAMTER_TYPE_CONSTANT, type)) { + } else if (StringUtils.equals(FunctionDesc.PARAMETER_TYPE_CONSTANT, type)) { validateCostantParameter(context, cube, value); } validateReturnType(context, cube, func); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java index e3fb30e..f853b08 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java @@ -188,13 +188,13 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase { FunctionDesc func = measureDesc.getFunction(); if (func.isTopN()) { int[] flatTableIdx = flatTableDesc.getMeasureColumnIndexes()[measureIdx]; - int displayColIdx = flatTableIdx[flatTableIdx.length - 1]; - TblColRef displayCol = func.getParameter().getColRefs().get(flatTableIdx.length - 1); - logger.info("Building dictionary for " + displayCol); - List<byte[]> valueList = readValueList(flatTable, nColumns, displayColIdx); - Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(displayCol.getType(), new IterableDictionaryValueEnumerator(valueList)); + int literalColIdx = flatTableIdx[flatTableIdx.length - 1]; + TblColRef literalCol = func.getTopNLiteralColumn(); + logger.info("Building dictionary for " + literalCol); + List<byte[]> valueList = readValueList(flatTable, nColumns, literalColIdx); + Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(literalCol.getType(), new IterableDictionaryValueEnumerator(valueList)); - result.put(displayCol, dict); + result.put(literalCol, dict); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java index 12371ce..6162477 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java @@ -30,6 +30,7 @@ import java.io.Serializable; * Column Metadata from Source. All name should be uppercase. * <p/> */ +@SuppressWarnings("serial") @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) public class ColumnDesc implements Serializable { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-metadata/src/main/java/org/apache/kylin/metadata/model/DatabaseDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DatabaseDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DatabaseDesc.java index 215e86c..6b8447d 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DatabaseDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DatabaseDesc.java @@ -27,6 +27,7 @@ import java.util.Set; /** * @author xjiang */ +@SuppressWarnings("serial") public class DatabaseDesc implements Serializable { private String name; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java index d10f395..b8cefa2 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java @@ -18,11 +18,13 @@ package org.apache.kylin.metadata.model; +import java.util.ArrayList; import java.util.Collection; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Lists; /** */ @@ -36,9 +38,9 @@ public class FunctionDesc { public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT"; public static final String FUNC_TOP_N = "TOP_N"; - public static final String PARAMTER_TYPE_CONSTANT = "constant"; + public static final String PARAMETER_TYPE_CONSTANT = "constant"; public static final String PARAMETER_TYPE_COLUMN = "column"; - + @JsonProperty("expression") private String expression; @JsonProperty("parameter") @@ -49,6 +51,26 @@ public class FunctionDesc { private DataType returnDataType; private boolean isDimensionAsMetric = false; + public void init(TableDesc factTable) { + expression = expression.toUpperCase(); + returnDataType = DataType.getInstance(returnType); + + for (ParameterDesc p = parameter; p != null; p = p.getNextParameter()) { + p.setValue(p.getValue().toUpperCase()); + } + + ArrayList<TblColRef> colRefs = Lists.newArrayList(); + for (ParameterDesc p = parameter; p != null; p = p.getNextParameter()) { + if (p.isColumnType()) { + ColumnDesc sourceColumn = factTable.findColumnByName(p.getValue()); + TblColRef colRef = new TblColRef(sourceColumn); + colRefs.add(colRef); + } + } + + parameter.setColRefs(colRefs); + } + public String getRewriteFieldName() { if (isSum()) { return getParameter().getValue(); @@ -161,11 +183,6 @@ public class FunctionDesc { public void setReturnType(String returnType) { this.returnType = returnType; - this.initReturnDataType(); - } - - // Jackson does not provide object post-processing currently - public void initReturnDataType() { this.returnDataType = DataType.getInstance(returnType); } @@ -225,13 +242,32 @@ public class FunctionDesc { return "FunctionDesc [expression=" + expression + ", parameter=" + parameter + ", returnType=" + returnType + "]"; } - public boolean isCompatible(FunctionDesc another) { - if (another == null) { + // cols[0] is numeric (e.g. GMV), cols[1] is literal (e.g. SELLER) + public TblColRef getTopNNumericColumn() { + if (isTopN() == false) + throw new IllegalStateException(); + + return parameter.getColRefs().get(0); + } + + // cols[0] is numeric (e.g. GMV), cols[1] is literal (e.g. SELLER) + public TblColRef getTopNLiteralColumn() { + if (isTopN() == false) + throw new IllegalStateException(); + + return parameter.getColRefs().get(1); + } + + public boolean isTopNCompatibleSum(FunctionDesc sum) { + if (isTopN() == false) + throw new IllegalStateException(); + + if (sum == null) { return false; } - if (this.isTopN() && another.isSum()) { - if (this.getParameter().getColRefs().get(0).equals(another.getParameter().getColRefs().get(0))) + if (this.isTopN() && sum.isSum()) { + if (this.getParameter().getColRefs().get(0).equals(sum.getParameter().getColRefs().get(0))) return true; } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java index 1561b1f..618d25a 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java @@ -18,6 +18,9 @@ package org.apache.kylin.metadata.model; +import java.util.Collections; +import java.util.List; + import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.JsonProperty; @@ -37,6 +40,15 @@ public class MeasureDesc { @JsonProperty("dependent_measure_ref") private String dependentMeasureRef; + public List<TblColRef> getColumnsNeedDictionary() { + // measure could store literal values using dictionary encoding to save space, like TopN + if (function.isTopN()) { + return Collections.singletonList(function.getTopNLiteralColumn()); + } else { + return Collections.emptyList(); + } + } + public int getId() { return id; } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java index 9773b84..2cf4374 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java @@ -19,11 +19,8 @@ package org.apache.kylin.metadata.model; import java.io.UnsupportedEncodingException; -import java.util.Arrays; import java.util.List; -import org.apache.commons.lang.StringUtils; - import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.JsonProperty; @@ -33,15 +30,13 @@ import com.fasterxml.jackson.annotation.JsonProperty; @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) public class ParameterDesc { - public static final String COLUMN_TYPE = "column"; - @JsonProperty("type") private String type; @JsonProperty("value") private String value; - @JsonProperty("displaycolumn") - private String displayColumn; + @JsonProperty("next_parameter") + private ParameterDesc nextParameter; private List<TblColRef> colRefs; @@ -65,14 +60,6 @@ public class ParameterDesc { this.value = value; } - public String getDisplayColumn() { - return displayColumn; - } - - public void setDisplayColumn(String displayColumn) { - this.displayColumn = displayColumn; - } - public List<TblColRef> getColRefs() { return colRefs; } @@ -80,19 +67,17 @@ public class ParameterDesc { public void setColRefs(List<TblColRef> colRefs) { this.colRefs = colRefs; } + + public ParameterDesc getNextParameter() { + return nextParameter; + } - public boolean isColumnType() { - return COLUMN_TYPE.equals(type); + public void setNextParameter(ParameterDesc nextParameter) { + this.nextParameter = nextParameter; } - public void normalizeColumnValue() { - if (isColumnType()) { - String values[] = value.split("\\s*,\\s*"); - for (int i = 0; i < values.length; i++) - values[i] = values[i].toUpperCase(); - Arrays.sort(values); - value = StringUtils.join(values, ","); - } + public boolean isColumnType() { + return FunctionDesc.PARAMETER_TYPE_COLUMN.equals(type); } @Override @@ -102,7 +87,7 @@ public class ParameterDesc { ParameterDesc that = (ParameterDesc) o; - if (displayColumn != null ? !displayColumn.equals(that.displayColumn) : that.displayColumn != null) return false; + if (nextParameter != null ? !nextParameter.equals(that.nextParameter) : that.nextParameter != null) return false; if (type != null ? !type.equals(that.type) : that.type != null) return false; if (value != null ? !value.equals(that.value) : that.value != null) return false; @@ -113,13 +98,13 @@ public class ParameterDesc { public int hashCode() { int result = type != null ? type.hashCode() : 0; result = 31 * result + (value != null ? value.hashCode() : 0); - result = 31 * result + (displayColumn != null ? displayColumn.hashCode() : 0); + result = 31 * result + (nextParameter != null ? nextParameter.hashCode() : 0); return result; } @Override public String toString() { - return "ParameterDesc [type=" + type + ", value=" + value + ", displayColumn=" + displayColumn + "]"; + return "ParameterDesc [type=" + type + ", value=" + value + ", nextParam=" + nextParameter + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java ---------------------------------------------------------------------- diff --git a/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java b/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java index 2b5ceee..2898f93 100644 --- a/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java +++ b/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java @@ -117,7 +117,6 @@ public class StorageMockUtils { return compareFilter; } - @SuppressWarnings("unused") public static TupleFilter buildAndFilter(List<TblColRef> columns) { CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0)); CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1)); @@ -127,7 +126,6 @@ public class StorageMockUtils { return andFilter; } - @SuppressWarnings("unused") public static TupleFilter buildOrFilter(List<TblColRef> columns) { CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0)); CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1)); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java index ed1fd4a..44311c2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java @@ -57,7 +57,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL private Text outputKey = new Text(); private Text outputValue = new Text(); private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - private Map<Integer, Dictionary<String>> topNDisplayColDictMap; + private Map<Integer, Dictionary<String>> topNLiteralColDictMap; @Override protected void setup(Context context) throws IOException { @@ -92,21 +92,21 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL int colCount = cubeDesc.getRowkey().getRowKeyColumns().length; keyBytesBuf = new byte[colCount][]; - initTopNDisplayColDictionaryMap(); + initTopNLiteralColDictionaryMap(); initNullBytes(); } - private void initTopNDisplayColDictionaryMap() { - topNDisplayColDictMap = Maps.newHashMap(); + private void initTopNLiteralColDictionaryMap() { + topNLiteralColDictMap = Maps.newHashMap(); for (int measureIdx = 0; measureIdx < measures.length; measureIdx++) { MeasureDesc measureDesc = cubeDesc.getMeasures().get(measureIdx); FunctionDesc func = measureDesc.getFunction(); if (func.isTopN()) { int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx]; - int displayColIdx = flatTableIdx[flatTableIdx.length - 1]; - TblColRef displayCol = func.getParameter().getColRefs().get(flatTableIdx.length - 1); - Dictionary<String> dictionary = (Dictionary<String>)cubeSegment.getDictionary(displayCol); - topNDisplayColDictMap.put(displayColIdx, dictionary); + int literalColIdx = flatTableIdx[flatTableIdx.length - 1]; + TblColRef literalCol = func.getTopNLiteralColumn(); + Dictionary<String> dictionary = (Dictionary<String>)cubeSegment.getDictionary(literalCol); + topNLiteralColDictMap.put(literalColIdx, dictionary); } } } @@ -174,13 +174,13 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL else if(func.isTopN()) { // encode the key column with dict, and get the counter column; int keyColIndex = flatTableIdx[flatTableIdx.length - 1]; - Dictionary<String> displayColDict = topNDisplayColDictMap.get(keyColIndex); - int keyColEncoded = displayColDict.getIdFromValue(Bytes.toString(splitBuffers[keyColIndex].value)); + Dictionary<String> literalColDict = topNLiteralColDictMap.get(keyColIndex); + int keyColEncoded = literalColDict.getIdFromValue(Bytes.toString(splitBuffers[keyColIndex].value)); valueBuf.clear(); - valueBuf.putInt(displayColDict.getSizeOfId()); + valueBuf.putInt(literalColDict.getSizeOfId()); valueBuf.putInt(keyColEncoded); if (flatTableIdx.length == 1) { - // only displayCol, use 1.0 as counter + // only literalCol, use 1.0 as counter valueBuf.putDouble(1.0); } else { // get the counter column value http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java index 286ff02..fc616fa 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java @@ -23,9 +23,6 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; -import com.google.common.collect.Lists; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.topn.Counter; import org.apache.kylin.common.topn.TopNCounter; @@ -51,7 +48,6 @@ import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.metadata.measure.MeasureCodec; -import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; @@ -59,6 +55,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; /** * @author shaoshi @@ -83,29 +80,14 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By private RowKeySplitter rowKeySplitter; private RowKeyEncoderProvider rowKeyEncoderProvider; - private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>(); + private HashMap<TblColRef, Boolean> dimensionsNeedDict = new HashMap<TblColRef, Boolean>(); + private List<MeasureDesc> measureDescs; private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); private MeasureCodec codec; private ByteArrayWritable outputValue = new ByteArrayWritable(); - private List<MeasureDesc> measuresDescs; - private Integer[] measureIdxUsingDict; - - private Boolean checkNeedMerging(TblColRef col) throws IOException { - Boolean ret = dictsNeedMerging.get(col); - if (ret != null) - return ret; - else { - ret = cubeDesc.getRowkey().isUseDictionary(col); - if (ret) { - String dictTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col).getTable(); - ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable); - } - dictsNeedMerging.put(col, ret); - return ret; - } - } + private List<Integer> topNMeasureIdx; @Override protected void setup(Context context) throws IOException, InterruptedException { @@ -130,17 +112,14 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255); rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment); - measuresDescs = cubeDesc.getMeasures(); - codec = new MeasureCodec(cubeDesc.getMeasures()); - if (cubeDesc.hasMeasureUsingDictionary()) { - List<Integer> measuresUsingDict = Lists.newArrayList(); - for (int i = 0; i < measuresDescs.size(); i++) { - if (measuresDescs.get(i).getFunction().isTopN()) { - // so far only TopN uses dic - measuresUsingDict.add(i); - } + measureDescs = cubeDesc.getMeasures(); + codec = new MeasureCodec(measureDescs); + + topNMeasureIdx = Lists.newArrayList(); + for (int i = 0; i < measureDescs.size(); i++) { + if (measureDescs.get(i).getFunction().isTopN()) { + topNMeasureIdx.add(i); } - measureIdxUsingDict = measuresUsingDict.toArray(new Integer[measuresUsingDict.size()]); } } @@ -213,11 +192,9 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf); outputKey.set(newKeyBuf.array(), 0, fullKeySize); - // encode measure if it uses dictionary - if (cubeDesc.hasMeasureUsingDictionary()) { - reEncodeMeasure(value); - } - + // re-encode measures if dictionary is used + reEncodeMeasure(value); + valueBuf.clear(); codec.encode(value, valueBuf); outputValue.set(valueBuf.array(), 0, valueBuf.position()); @@ -225,46 +202,60 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By context.write(outputKey, outputValue); } + private Boolean checkNeedMerging(TblColRef col) throws IOException { + Boolean ret = dimensionsNeedDict.get(col); + if (ret != null) + return ret; + else { + ret = cubeDesc.getRowkey().isUseDictionary(col); + if (ret) { + String dictTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col).getTable(); + ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable); + } + dimensionsNeedDict.put(col, ret); + return ret; + } + } + + @SuppressWarnings("unchecked") private void reEncodeMeasure(Object[] measureObjs) throws IOException, InterruptedException { + // currently only topN uses dictionary in measure obj + if (topNMeasureIdx.isEmpty()) + return; + int bufOffset = 0; - for (int idx : measureIdxUsingDict) { - // only TopN measure uses dic + for (int idx : topNMeasureIdx) { TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx]; - MeasureDesc measureDesc = measuresDescs.get(idx); - String displayCol = measureDesc.getFunction().getParameter().getDisplayColumn().toUpperCase(); - if (StringUtils.isNotEmpty(displayCol)) { - ColumnDesc sourceColumn = cubeDesc.getFactTableDesc().findColumnByName(displayCol); - TblColRef colRef = new TblColRef(sourceColumn); - DictionaryManager dictMgr = DictionaryManager.getInstance(config); - Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef)); - Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef)); + MeasureDesc measureDesc = measureDescs.get(idx); + TblColRef colRef = measureDesc.getFunction().getTopNLiteralColumn(); + DictionaryManager dictMgr = DictionaryManager.getInstance(config); + Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef)); + Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef)); + + int topNSize = topNCounters.size(); + while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // + mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // + mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) { + byte[] oldBuf = newKeyBodyBuf; + newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length]; + System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length); + } - int topNSize = topNCounters.size(); - while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // - mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // - mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) { - byte[] oldBuf = newKeyBodyBuf; - newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length]; - System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length); + for (Counter<ByteArray> c : topNCounters) { + int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length()); + int idInMergedDict; + int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset); + if (size < 0) { + idInMergedDict = mergedDict.nullId(); + } else { + idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size); } - for (Counter<ByteArray> c : topNCounters) { - int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length()); - int idInMergedDict; - int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset); - if (size < 0) { - idInMergedDict = mergedDict.nullId(); - } else { - idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size); - } - - BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); - c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); - bufOffset += mergedDict.getSizeOfId(); - } + BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); + c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); + bufOffset += mergedDict.getSizeOfId(); } } - } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java index 68d1481..6c2679e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java @@ -18,8 +18,13 @@ package org.apache.kylin.engine.mr.steps; -import com.google.common.collect.Lists; -import org.apache.commons.lang.StringUtils; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.kylin.common.KylinConfig; @@ -43,17 +48,11 @@ import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.metadata.measure.MeasureCodec; -import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import com.google.common.collect.Lists; /** * @author ysong1, honma @@ -76,29 +75,16 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { private RowKeySplitter rowKeySplitter; private RowKeyEncoderProvider rowKeyEncoderProvider; - private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>(); - private List<MeasureDesc> measuresDescs; + private HashMap<TblColRef, Boolean> dimensionsNeedDict = new HashMap<TblColRef, Boolean>(); + + // for re-encode measures that use dictionary + private List<Integer> topNMeasureIdx; + private List<MeasureDesc> measureDescs; private MeasureCodec codec; private Object[] measureObjs; - private Integer[] measureIdxUsingDict; private ByteBuffer valueBuf; private Text outputValue; - private Boolean checkNeedMerging(TblColRef col) throws IOException { - Boolean ret = dictsNeedMerging.get(col); - if (ret != null) - return ret; - else { - ret = cubeDesc.getRowkey().isUseDictionary(col); - if (ret) { - String dictTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col).getTable(); - ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable); - } - dictsNeedMerging.put(col, ret); - return ret; - } - } - @Override protected void setup(Context context) throws IOException, InterruptedException { super.bindCurrentConfiguration(context.getConfiguration()); @@ -124,20 +110,16 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255); rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment); - if (cubeDesc.hasMeasureUsingDictionary()) { - measuresDescs = cubeDesc.getMeasures(); - codec = new MeasureCodec(measuresDescs); - measureObjs = new Object[measuresDescs.size()]; - List<Integer> measuresUsingDict = Lists.newArrayList(); - for (int i = 0; i < measuresDescs.size(); i++) { - if (measuresDescs.get(i).getFunction().isTopN()) { - // so far only TopN uses dic - measuresUsingDict.add(i); - } + measureDescs = cubeDesc.getMeasures(); + codec = new MeasureCodec(measureDescs); + measureObjs = new Object[measureDescs.size()]; + valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); + outputValue = new Text(); + topNMeasureIdx = Lists.newArrayList(); + for (int i = 0; i < measureDescs.size(); i++) { + if (measureDescs.get(i).getFunction().isTopN()) { + topNMeasureIdx.add(i); } - measureIdxUsingDict = measuresUsingDict.toArray(new Integer[measuresUsingDict.size()]); - valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - outputValue = new Text(); } } @@ -231,60 +213,70 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf); outputKey.set(newKeyBuf.array(), 0, fullKeySize); - // encode measure if it uses dictionary - if (cubeDesc.hasMeasureUsingDictionary()) { + // re-encode measures if dictionary is used + if (topNMeasureIdx.size() > 0) { codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs); reEncodeMeasure(measureObjs); valueBuf.clear(); codec.encode(measureObjs, valueBuf); outputValue.set(valueBuf.array(), 0, valueBuf.position()); value = outputValue; - } - + } + context.write(outputKey, value); } + private Boolean checkNeedMerging(TblColRef col) throws IOException { + Boolean ret = dimensionsNeedDict.get(col); + if (ret != null) + return ret; + else { + ret = cubeDesc.getRowkey().isUseDictionary(col); + if (ret) { + String dictTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col).getTable(); + ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable); + } + dimensionsNeedDict.put(col, ret); + return ret; + } + } + + @SuppressWarnings("unchecked") private void reEncodeMeasure(Object[] measureObjs) throws IOException, InterruptedException { int bufOffset = 0; - for (int idx : measureIdxUsingDict) { + for (int idx : topNMeasureIdx) { // only TopN measure uses dic TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx]; - MeasureDesc measureDesc = measuresDescs.get(idx); - String displayCol = measureDesc.getFunction().getParameter().getDisplayColumn().toUpperCase(); - if (StringUtils.isNotEmpty(displayCol)) { - ColumnDesc sourceColumn = cubeDesc.getFactTableDesc().findColumnByName(displayCol); - TblColRef colRef = new TblColRef(sourceColumn); - DictionaryManager dictMgr = DictionaryManager.getInstance(config); - Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef)); - Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef)); + MeasureDesc measureDesc = measureDescs.get(idx); + TblColRef colRef = measureDesc.getFunction().getTopNLiteralColumn(); + DictionaryManager dictMgr = DictionaryManager.getInstance(config); + Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef)); + Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef)); + + int topNSize = topNCounters.size(); + while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // + mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // + mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) { + byte[] oldBuf = newKeyBodyBuf; + newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length]; + System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length); + } - int topNSize = topNCounters.size(); - while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // - mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || // - mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) { - byte[] oldBuf = newKeyBodyBuf; - newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length]; - System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length); + for (Counter<ByteArray> c : topNCounters) { + int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length()); + int idInMergedDict; + int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset); + if (size < 0) { + idInMergedDict = mergedDict.nullId(); + } else { + idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size); } - for (Counter<ByteArray> c : topNCounters) { - int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length()); - int idInMergedDict; - int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset); - if (size < 0) { - idInMergedDict = mergedDict.nullId(); - } else { - idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size); - } - - BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); - c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); - bufOffset += mergedDict.getSizeOfId(); - } + BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); + c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); + bufOffset += mergedDict.getSizeOfId(); } } - } - } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/examples/test_case_data/localmeta/cube/test_kylin_cube_topn.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube/test_kylin_cube_topn.json b/examples/test_case_data/localmeta/cube/test_kylin_cube_topn.json deleted file mode 100644 index 903fc15..0000000 --- a/examples/test_case_data/localmeta/cube/test_kylin_cube_topn.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "uuid" : "33354455-a33e-4b69-83dd-0bb8b1f8c53b", - "last_modified" : 0, - "name" : "test_kylin_cube_topn", - "owner" : null, - "version" : null, - "descriptor" : "test_kylin_cube_topn_desc", - "segments" : [ ], - "create_time" : null -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/examples/test_case_data/localmeta/cube/test_kylin_cube_topn_left_join.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube/test_kylin_cube_topn_left_join.json b/examples/test_case_data/localmeta/cube/test_kylin_cube_topn_left_join.json deleted file mode 100644 index 6f57561..0000000 --- a/examples/test_case_data/localmeta/cube/test_kylin_cube_topn_left_join.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "uuid" : "44454455-a33e-4b69-83dd-0bb8b1f8c53b", - "last_modified" : 0, - "name" : "test_kylin_cube_topn_left_join", - "owner" : null, - "version" : null, - "descriptor" : "test_kylin_cube_topn_left_join_desc", - "segments" : [ ], - "create_time" : null -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json deleted file mode 100644 index fddbb10..0000000 --- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json +++ /dev/null @@ -1,148 +0,0 @@ -{ - "uuid": "4334a905-1fc6-4f67-985c-38fa5aeafd92", - "name": "test_kylin_cube_topn_desc", - "description": null, - "dimensions": [ - { - "id": 0, - "name": "CAL_DT", - "table": "EDW.TEST_CAL_DT", - "column": null, - "derived": [ - "WEEK_BEG_DT" - ], - "hierarchy": false - } - ], - "measures": [ - { - "id": 1, - "name": "GMV_SUM", - "function": { - "expression": "SUM", - "parameter": { - "type": "column", - "value": "PRICE" - }, - "returntype": "decimal(19,4)" - }, - "dependent_measure_ref": null - }, - { - "id": 2, - "name": "GMV_MIN", - "function": { - "expression": "MIN", - "parameter": { - "type": "column", - "value": "PRICE" - }, - "returntype": "decimal(19,4)" - }, - "dependent_measure_ref": null - }, - { - "id": 3, - "name": "GMV_MAX", - "function": { - "expression": "MAX", - "parameter": { - "type": "column", - "value": "PRICE" - }, - "returntype": "decimal(19,4)" - }, - "dependent_measure_ref": null - }, - { - "id": 4, - "name": "TRANS_CNT", - "function": { - "expression": "COUNT", - "parameter": { - "type": "constant", - "value": "1" - }, - "returntype": "bigint" - }, - "dependent_measure_ref": null - }, - { - "id": 5, - "name": "ITEM_COUNT_SUM", - "function": { - "expression": "SUM", - "parameter": { - "type": "column", - "value": "ITEM_COUNT" - }, - "returntype": "bigint" - }, - "dependent_measure_ref": null - }, - { - "id": 6, - "name": "TOP_SELLER", - "function": { - "expression": "TOP_N", - "parameter": { - "type": "column", - "value": "PRICE", - "displaycolumn": "SELLER_ID" - }, - "returntype": "topn(100)" - }, - "dependent_measure_ref": null - } - ], - "rowkey": { - "rowkey_columns": [ - { - "column": "cal_dt", - "length": 0, - "dictionary": "true", - "mandatory": false - } - ], - "aggregation_groups": [ - [ - "cal_dt" - ] - ] - }, - "last_modified": 1422435345330, - "model_name": "test_kylin_inner_join_model_desc", - "null_string": null, - "hbase_mapping": { - "column_family": [ - { - "name": "f1", - "columns": [ - { - "qualifier": "m", - "measure_refs": [ - "gmv_sum", - "gmv_min", - "gmv_max", - "trans_cnt", - "item_count_sum" - ] - } - ] - }, { - "name": "f2", - "columns": [ - { - "qualifier": "m", - "measure_refs": [ - "top_seller" - ] - } - ] - } - ] - }, - "notify_list": null, - "engine_type": 2, - "storage_type": 2 -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_left_join_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_left_join_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_left_join_desc.json deleted file mode 100644 index 6aecaae..0000000 --- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_left_join_desc.json +++ /dev/null @@ -1,149 +0,0 @@ -{ - "uuid": "5445a905-1fc6-4f67-985c-38fa5aeafd92", - "name": "test_kylin_cube_topn_left_join_desc", - "description": null, - "dimensions": [ - { - "id": 0, - "name": "CAL_DT", - "table": "EDW.TEST_CAL_DT", - "column": null, - "derived": [ - "WEEK_BEG_DT" - ], - "hierarchy": false - } - ], - "measures": [ - { - "id": 1, - "name": "GMV_SUM", - "function": { - "expression": "SUM", - "parameter": { - "type": "column", - "value": "PRICE" - }, - "returntype": "decimal(19,4)" - }, - "dependent_measure_ref": null - }, - { - "id": 2, - "name": "GMV_MIN", - "function": { - "expression": "MIN", - "parameter": { - "type": "column", - "value": "PRICE" - }, - "returntype": "decimal(19,4)" - }, - "dependent_measure_ref": null - }, - { - "id": 3, - "name": "GMV_MAX", - "function": { - "expression": "MAX", - "parameter": { - "type": "column", - "value": "PRICE" - }, - "returntype": "decimal(19,4)" - }, - "dependent_measure_ref": null - }, - { - "id": 4, - "name": "TRANS_CNT", - "function": { - "expression": "COUNT", - "parameter": { - "type": "constant", - "value": "1" - }, - "returntype": "bigint" - }, - "dependent_measure_ref": null - }, - { - "id": 5, - "name": "ITEM_COUNT_SUM", - "function": { - "expression": "SUM", - "parameter": { - "type": "column", - "value": "ITEM_COUNT" - }, - "returntype": "bigint" - }, - "dependent_measure_ref": null - }, - { - "id": 6, - "name": "TOP_SELLER", - "function": { - "expression": "TOP_N", - "parameter": { - "type": "column", - "value": "PRICE", - "displaycolumn": "SELLER_ID" - }, - "returntype": "topn(100)" - }, - "dependent_measure_ref": null - } - ], - "rowkey": { - "rowkey_columns": [ - { - "column": "cal_dt", - "length": 0, - "dictionary": "true", - "mandatory": false - } - ], - "aggregation_groups": [ - [ - "cal_dt" - ] - ] - }, - "last_modified": 1422435345330, - "model_name": "test_kylin_left_join_model_desc", - "null_string": null, - "hbase_mapping": { - "column_family": [ - { - "name": "f1", - "columns": [ - { - "qualifier": "m", - "measure_refs": [ - "gmv_sum", - "gmv_min", - "gmv_max", - "trans_cnt", - "item_count_sum" - ] - } - ] - }, - { - "name": "f2", - "columns": [ - { - "qualifier": "m", - "measure_refs": [ - "top_seller" - ] - } - ] - } - ] - }, - "notify_list": null, - "engine_type": 2, - "storage_type": 2 -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json index 3f9957b..1e007c3 100644 --- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json +++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json @@ -156,7 +156,11 @@ "expression": "COUNT_DISTINCT", "parameter": { "type": "column", - "value": "LSTG_FORMAT_NAME,SELLER_ID" + "value": "LSTG_FORMAT_NAME", + "next_parameter": { + "type": "column", + "value": "SELLER_ID" + } }, "returntype": "hllc(10)" }, @@ -170,7 +174,10 @@ "parameter": { "type": "column", "value": "PRICE", - "displaycolumn": "SELLER_ID" + "next_parameter": { + "type": "column", + "value": "SELLER_ID" + } }, "returntype": "topn(100)" }, http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json index 907e338..73a58f0 100644 --- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json +++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json @@ -156,7 +156,11 @@ "expression": "COUNT_DISTINCT", "parameter": { "type": "column", - "value": "LSTG_FORMAT_NAME,SELLER_ID" + "value": "LSTG_FORMAT_NAME", + "next_parameter": { + "type": "column", + "value": "SELLER_ID" + } }, "returntype": "hllc(10)" }, @@ -170,7 +174,10 @@ "parameter": { "type": "column", "value": "PRICE", - "displaycolumn": "SELLER_ID" + "next_parameter": { + "type": "column", + "value": "SELLER_ID" + } }, "returntype": "topn(100)" }, http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java index 71737dc..452e3a3 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java @@ -35,7 +35,16 @@ import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.MetadataManager; -import org.apache.kylin.metadata.model.*; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.DimensionDesc; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.IEngineAware; +import org.apache.kylin.metadata.model.IStorageAware; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.ParameterDesc; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TblColRef; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; @@ -49,6 +58,7 @@ import com.google.common.collect.Sets; /** * @author yangli9 */ +@SuppressWarnings("serial") @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) public class IIDesc extends RootPersistentEntity { @@ -231,7 +241,7 @@ public class IIDesc extends RootPersistentEntity { /** * - * @param hllType represents the presision + * @param hllType represents the precision */ private MeasureDesc makeHLLMeasure(ColumnDesc columnDesc, String hllType) { String columnName = columnDesc.getName(); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java index 7e54a98..e17133f 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java @@ -19,7 +19,6 @@ package org.apache.kylin.invertedindex.model; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Iterator; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java index e950911..cbc0c56 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java @@ -210,7 +210,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { if (!column.isInnerColumn()) { parameter = new ParameterDesc(); parameter.setValue(column.getName()); - parameter.setType("column"); + parameter.setType(FunctionDesc.PARAMETER_TYPE_COLUMN); parameter.setColRefs(Arrays.asList(column)); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java index f84e4e6..4d34943 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java @@ -87,32 +87,20 @@ public class CubeStorageQuery implements ICachableStorageQuery { private final CubeInstance cubeInstance; private final CubeDesc cubeDesc; private final String uuid; - private Collection<TblColRef> topNColumns; public CubeStorageQuery(CubeInstance cube) { this.cubeInstance = cube; this.cubeDesc = cube.getDescriptor(); this.uuid = cube.getUuid(); - this.topNColumns = Lists.newArrayList(); - for (MeasureDesc measureDesc : cubeDesc.getMeasures()) { - if (measureDesc.getFunction().isTopN()) { - List<TblColRef> colRefs = measureDesc.getFunction().getParameter().getColRefs(); - topNColumns.add(colRefs.get(colRefs.size() - 1)); - } - } } @Override public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) { - // check whether this is a TopN query; - checkAndRewriteTopN(context, sqlDigest, returnTupleInfo); + // check whether this is a TopN query + checkAndRewriteTopN(sqlDigest); Collection<TblColRef> groups = sqlDigest.groupbyColumns; - TblColRef topNCol = extractTopNCol(groups); - if (topNCol != null) - groups.remove(topNCol); - TupleFilter filter = sqlDigest.filter; // build dimension & metrics @@ -156,15 +144,15 @@ public class CubeStorageQuery implements ICachableStorageQuery { // check involved measures, build value decoder for each each family:column List<RowValueDecoder> valueDecoders = translateAggregation(cubeDesc.getHBaseMapping(), metrics, context); - //memory hungry distinct count are pushed down to coprocessor, no need to set threshold any more - //setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory + // memory hungry distinct count are pushed down to coprocessor, no need to set threshold any more + // setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial setLimit(filter, context); HConnection conn = HBaseConnection.get(context.getConnUrl()); - return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, topNCol, valueDecoders, context, returnTupleInfo); - //Notice we're passing filterD down to storage instead of flatFilter + // notice we're passing filterD down to storage instead of flatFilter + return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo); } @Override @@ -196,11 +184,6 @@ public class CubeStorageQuery implements ICachableStorageQuery { continue; } - // skip topN display col - if (topNColumns.contains(column)) { - continue; - } - dimensions.add(column); } } @@ -767,48 +750,33 @@ public class CubeStorageQuery implements ICachableStorageQuery { ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context); } - private void checkAndRewriteTopN(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) { - Collection<TblColRef> groups = sqlDigest.groupbyColumns; - TblColRef topNDisplayCol = extractTopNCol(groups); - boolean hasTopN = topNDisplayCol != null; - - if (hasTopN == false) + private void checkAndRewriteTopN(SQLDigest sqlDigest) { + FunctionDesc topnFunc = null; + TblColRef topnLiteralCol = null; + for (MeasureDesc measure : cubeDesc.getMeasures()) { + FunctionDesc func = measure.getFunction(); + if (func.isTopN() && sqlDigest.groupbyColumns.contains(func.getTopNLiteralColumn())) { + topnFunc = func; + topnLiteralCol = func.getTopNLiteralColumn(); + } + } + + // if TopN is not involved + if (topnFunc == null) return; if (sqlDigest.aggregations.size() != 1) { throw new IllegalStateException("When query with topN, only one metrics is allowed."); } - FunctionDesc functionDesc = sqlDigest.aggregations.iterator().next(); - if (functionDesc.isSum() == false) { + FunctionDesc origFunc = sqlDigest.aggregations.iterator().next(); + if (origFunc.isSum() == false) { throw new IllegalStateException("When query with topN, only SUM function is allowed."); } - FunctionDesc rewriteFunction = null; - // replace the SUM to the TopN function - for (MeasureDesc measureDesc : cubeDesc.getMeasures()) { - if (measureDesc.getFunction().isCompatible(functionDesc) && topNDisplayCol.getName().equalsIgnoreCase(measureDesc.getFunction().getParameter().getDisplayColumn())) { - rewriteFunction = measureDesc.getFunction(); - break; - } - } - - if (rewriteFunction == null) { - throw new IllegalStateException("Didn't find topN measure for " + functionDesc); - } - - sqlDigest.aggregations = Lists.newArrayList(rewriteFunction); - logger.info("Rewrite function " + functionDesc + " to " + rewriteFunction); + sqlDigest.aggregations = Lists.newArrayList(topnFunc); + sqlDigest.groupbyColumns.remove(topnLiteralCol); + sqlDigest.metricColumns.add(topnLiteralCol); + logger.info("Rewrite function " + origFunc + " to " + topnFunc); } - - private TblColRef extractTopNCol(Collection<TblColRef> colRefs) { - for (TblColRef colRef : colRefs) { - if (topNColumns.contains(colRef)) { - return colRef; - } - } - - return null; - } - } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java index 831cadb..0983689 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.metadata.filter.TupleFilter; +import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.tuple.ITuple; import org.apache.kylin.metadata.tuple.ITupleIterator; @@ -58,7 +59,7 @@ public class SerializedHBaseTupleIterator implements ITupleIterator { private ITuple next; public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, // - Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, TblColRef topNCol, List<RowValueDecoder> rowValueDecoders, // + Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, // StorageContext context, TupleInfo returnTupleInfo) { this.context = context; @@ -67,14 +68,9 @@ public class SerializedHBaseTupleIterator implements ITupleIterator { this.segmentIteratorList = new ArrayList<CubeSegmentTupleIterator>(segmentKeyRanges.size()); Map<CubeSegment, List<HBaseKeyRange>> rangesMap = makeRangesMap(segmentKeyRanges); - boolean useTopN = topNCol != null; + for (Map.Entry<CubeSegment, List<HBaseKeyRange>> entry : rangesMap.entrySet()) { - CubeSegmentTupleIterator segIter; - if (useTopN) - segIter = new CubeSegmentTopNTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, topNCol, rowValueDecoders, context, returnTupleInfo); - else - segIter = new CubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo); - this.segmentIteratorList.add(segIter); + this.segmentIteratorList.add(newCubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo)); } this.segmentIteratorIterator = this.segmentIteratorList.iterator(); @@ -85,6 +81,16 @@ public class SerializedHBaseTupleIterator implements ITupleIterator { } } + private CubeSegmentTupleIterator newCubeSegmentTupleIterator(CubeSegment seg, List<HBaseKeyRange> keyRange, HConnection conn, Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, StorageContext context2, TupleInfo returnTupleInfo) { + MeasureDesc topN = RowValueDecoder.findTopN(rowValueDecoders); + if (topN != null) { + TblColRef topNCol = topN.getFunction().getTopNLiteralColumn(); + return new CubeSegmentTopNTupleIterator(seg, keyRange, conn, dimensions, filter, groupBy, topNCol, rowValueDecoders, context, returnTupleInfo); + } else { + return new CubeSegmentTupleIterator(seg, keyRange, conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo); + } + } + private Map<CubeSegment, List<HBaseKeyRange>> makeRangesMap(List<HBaseKeyRange> segmentKeyRanges) { Map<CubeSegment, List<HBaseKeyRange>> map = Maps.newHashMap(); for (HBaseKeyRange range : segmentKeyRanges) {