http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java index e399a70..0ba6566 100644 --- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java +++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java @@ -21,15 +21,14 @@ package org.apache.kylin.dict; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.*; -import com.google.common.collect.Lists; +import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.common.util.*; import org.apache.kylin.dict.lookup.ReadableTable; -import org.apache.kylin.metadata.model.DataType; +import org.apache.kylin.metadata.datatype.DataType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +42,7 @@ public class DictionaryGenerator { private static final Logger logger = LoggerFactory.getLogger(DictionaryGenerator.class); - private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd" }; + private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd" }; private static int getDictionaryMaxCardinality() { try { @@ -53,20 +52,20 @@ public class DictionaryGenerator { } } - public static Dictionary<?> buildDictionaryFromValueEnumerator(DictionaryInfo info, IDictionaryValueEnumerator valueEnumerator) throws IOException{ - Dictionary dict = null; + public static org.apache.kylin.common.util.Dictionary<?> buildDictionaryFromValueEnumerator(DictionaryInfo info, IDictionaryValueEnumerator valueEnumerator) throws IOException{ + org.apache.kylin.common.util.Dictionary dict = null; int baseId = 0; // always 0 for now - final int nSamples = 5; - ArrayList samples = Lists.newArrayListWithCapacity(nSamples); + final int nSamples = 5; + ArrayList samples = Lists.newArrayListWithCapacity(nSamples); // build dict, case by data type DataType dataType = DataType.getInstance(info.getDataType()); if (dataType.isDateTimeFamily()) - dict = buildDateStrDict(valueEnumerator, baseId, nSamples, samples); + dict = buildDateStrDict(valueEnumerator, baseId, nSamples, samples); else if (dataType.isNumberFamily()) - dict = buildNumberDict(valueEnumerator, baseId, nSamples, samples); + dict = buildNumberDict(valueEnumerator, baseId, nSamples, samples); else - dict = buildStringDict(valueEnumerator, baseId, nSamples, samples); + dict = buildStringDict(valueEnumerator, baseId, nSamples, samples); // log a few samples StringBuilder buf = new StringBuilder(); @@ -76,54 +75,54 @@ public class DictionaryGenerator { buf.append(s.toString()).append("=>").append(dict.getIdFromValue(s)); } logger.info("Dictionary value samples: " + buf.toString()); - logger.info("Dictionary cardinality: " + dict.getSize()); + logger.info("Dictionary cardinality: " + dict.getSize()); - if (dict instanceof TrieDictionary && dict.getSize() > DICT_MAX_CARDINALITY) - throw new IllegalArgumentException("Too high cardinality is not suitable for dictionary -- " + info.getSourceTable() + "." + info.getSourceColumn() + " cardinality: " + dict.getSize()); + if (dict instanceof TrieDictionary && dict.getSize() > DICT_MAX_CARDINALITY) + throw new IllegalArgumentException("Too high cardinality is not suitable for dictionary -- " + info.getSourceTable() + "." + info.getSourceColumn() + " cardinality: " + dict.getSize()); return dict; } - public static Dictionary mergeDictionaries(DictionaryInfo targetInfo, List<DictionaryInfo> sourceDicts) throws IOException { - return buildDictionaryFromValueEnumerator(targetInfo, new MultipleDictionaryValueEnumerator(sourceDicts)); + public static org.apache.kylin.common.util.Dictionary mergeDictionaries(DictionaryInfo targetInfo, List<DictionaryInfo> sourceDicts) throws IOException { + return buildDictionaryFromValueEnumerator(targetInfo, new MultipleDictionaryValueEnumerator(sourceDicts)); } - public static Dictionary<?> buildDictionary(DictionaryInfo info, ReadableTable inpTable) throws IOException { + public static org.apache.kylin.common.util.Dictionary<?> buildDictionary(DictionaryInfo info, ReadableTable inpTable) throws IOException { // currently all data types are casted to string to build dictionary // String dataType = info.getDataType(); - IDictionaryValueEnumerator columnValueEnumerator = null; - try { - logger.info("Building dictionary " + JsonUtil.writeValueAsString(info)); + IDictionaryValueEnumerator columnValueEnumerator = null; + try { + logger.info("Building dictionary " + JsonUtil.writeValueAsString(info)); - columnValueEnumerator = new TableColumnValueEnumerator(inpTable.getReader(), info.getSourceColumnIndex()); - return buildDictionaryFromValueEnumerator(info, columnValueEnumerator); - } finally { - if (columnValueEnumerator != null) - columnValueEnumerator.close(); - } + columnValueEnumerator = new TableColumnValueEnumerator(inpTable.getReader(), info.getSourceColumnIndex()); + return buildDictionaryFromValueEnumerator(info, columnValueEnumerator); + } finally { + if (columnValueEnumerator != null) + columnValueEnumerator.close(); + } } - private static Dictionary buildDateStrDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException { + private static org.apache.kylin.common.util.Dictionary buildDateStrDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException { final int BAD_THRESHOLD = 2; String matchPattern = null; - byte[] value; + byte[] value; for (String ptn : DATE_PATTERNS) { matchPattern = ptn; // be optimistic int badCount = 0; SimpleDateFormat sdf = new SimpleDateFormat(ptn); - - while (valueEnumerator.moveNext()) { - value = valueEnumerator.current(); + + while (valueEnumerator.moveNext()) { + value = valueEnumerator.current(); if (value.length == 0) continue; String str = Bytes.toString(value); try { sdf.parse(str); - if (samples.size() < nSamples && !samples.contains(str)) + if (samples.size() < nSamples && !samples.contains(str)) samples.add(str); } catch (ParseException e) { logger.info("Unrecognized datetime value: " + str); @@ -134,33 +133,33 @@ public class DictionaryGenerator { } } } - if (matchPattern != null) { + if (matchPattern != null) { return new DateStrDictionary(matchPattern, baseId); - } + } } throw new IllegalStateException("Unrecognized datetime value"); } - private static Dictionary buildStringDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException { + private static org.apache.kylin.common.util.Dictionary buildStringDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException { TrieDictionaryBuilder builder = new TrieDictionaryBuilder(new StringBytesConverter()); - byte[] value; - while (valueEnumerator.moveNext()) { - value = valueEnumerator.current(); + byte[] value; + while (valueEnumerator.moveNext()) { + value = valueEnumerator.current(); if (value == null) continue; String v = Bytes.toString(value); builder.addValue(v); - if (samples.size() < nSamples && !samples.contains(v)) + if (samples.size() < nSamples && !samples.contains(v)) samples.add(v); } return builder.build(baseId); } - private static Dictionary buildNumberDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException { + private static org.apache.kylin.common.util.Dictionary buildNumberDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException { NumberDictionaryBuilder builder = new NumberDictionaryBuilder(new StringBytesConverter()); - byte[] value; - while (valueEnumerator.moveNext()) { - value = valueEnumerator.current(); + byte[] value; + while (valueEnumerator.moveNext()) { + value = valueEnumerator.current(); if (value == null) continue; String v = Bytes.toString(value); @@ -168,7 +167,7 @@ public class DictionaryGenerator { continue; builder.addValue(v); - if (samples.size() < nSamples && !samples.contains(v)) + if (samples.size() < nSamples && !samples.contains(v)) samples.add(v); } return builder.build(baseId);
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java index 0d6c9b1..645722c 100644 --- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java +++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java @@ -20,6 +20,7 @@ package org.apache.kylin.dict; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.RootPersistentEntity; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.dict.lookup.ReadableTable.TableSignature; import com.fasterxml.jackson.annotation.JsonAutoDetect; http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java index 9ac8206..6381643 100644 --- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java +++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java @@ -24,6 +24,7 @@ import java.io.IOException; import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.JsonUtil; /** http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java index 1c492c4..aa0bc5d 100644 --- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java +++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.dict.lookup.FileTable; import org.apache.kylin.dict.lookup.HiveTable; http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/ISegment.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/org/apache/kylin/dict/ISegment.java b/dictionary/src/main/java/org/apache/kylin/dict/ISegment.java index b877eef..68368b5 100644 --- a/dictionary/src/main/java/org/apache/kylin/dict/ISegment.java +++ b/dictionary/src/main/java/org/apache/kylin/dict/ISegment.java @@ -18,6 +18,7 @@ package org.apache.kylin.dict; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.metadata.model.TblColRef; /** @@ -27,7 +28,7 @@ public interface ISegment { public abstract int getColumnLength(TblColRef col); - public abstract Dictionary<?> getDictionary(TblColRef col); + public abstract Dictionary<String> getDictionary(TblColRef col); public String getName(); http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java b/dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java index 13f7394..43d62a3 100644 --- a/dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java +++ b/dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java @@ -20,6 +20,7 @@ package org.apache.kylin.dict; import com.google.common.collect.Lists; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.Dictionary; import java.io.IOException; import java.util.List; http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java b/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java index 3479dbd..2b6d77d 100644 --- a/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java +++ b/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java @@ -30,6 +30,7 @@ import java.util.HashMap; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.Dictionary; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java index 96a7cd6..cc3c637 100644 --- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java +++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java @@ -30,7 +30,7 @@ import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.RootPersistentEntity; -import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.dict.StringBytesConverter; import org.apache.kylin.dict.TrieDictionary; import org.apache.kylin.dict.TrieDictionaryBuilder; http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json index 485170b..fedb2c1 100644 --- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json +++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json @@ -169,7 +169,12 @@ "expression": "COUNT_DISTINCT", "parameter": { "type": "column", - "value": "LSTG_FORMAT_NAME,SELLER_ID" + "value": "LSTG_FORMAT_NAME", + "next_parameter": { + "type": "column", + "value": "SELLER_ID", + "next_parameter": null + } }, "returntype": "hllc(10)" }, http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json index c6bd12f..7e6934b 100644 --- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json +++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json @@ -169,7 +169,12 @@ "expression": "COUNT_DISTINCT", "parameter": { "type": "column", - "value": "LSTG_FORMAT_NAME,SELLER_ID" + "value": "LSTG_FORMAT_NAME", + "next_parameter": { + "type": "column", + "value": "SELLER_ID", + "next_parameter": null + } }, "returntype": "hllc(10)" }, http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/IICapabilityChecker.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IICapabilityChecker.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IICapabilityChecker.java new file mode 100644 index 0000000..4ee8f50 --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IICapabilityChecker.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.invertedindex; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.kylin.invertedindex.model.IIDesc; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.JoinDesc; +import org.apache.kylin.metadata.model.LookupDesc; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.realization.CapabilityResult; +import org.apache.kylin.metadata.realization.SQLDigest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Sets; + +/** + */ +public class IICapabilityChecker { + private static final Logger logger = LoggerFactory.getLogger(IICapabilityChecker.class); + + public static CapabilityResult check(IIInstance ii, SQLDigest digest) { + CapabilityResult result = new CapabilityResult(); + result.capable = false; + + // match fact table + if (!digest.factTable.equalsIgnoreCase(ii.getFactTable())) { + logger.info("Exclude II " + ii.getName() + " because fact table unmatch"); + return result; + } + + // match joins + boolean matchJoin = isMatchedWithJoins(digest.joinDescs, ii); + if (!matchJoin) { + logger.info("Exclude II " + ii.getName() + " because unmatched joins"); + return result; + } + + // dimensions & measures + Collection<TblColRef> dimensionColumns = getDimensionColumns(digest); + Collection<FunctionDesc> aggrFunctions = digest.aggregations; + Collection<TblColRef> unmatchedDimensions = unmatchedDimensions(dimensionColumns, ii); + Collection<FunctionDesc> unmatchedAggregations = unmatchedAggregations(aggrFunctions, ii); + + // try dimension-as-measure + if (!unmatchedAggregations.isEmpty()) { + tryDimensionAsMeasures(unmatchedAggregations, digest, ii, result); + } + + if (!unmatchedDimensions.isEmpty()) { + logger.info("Exclude ii " + ii.getName() + " because unmatched dimensions"); + return result; + } + + if (!unmatchedAggregations.isEmpty()) { + logger.info("Exclude ii " + ii.getName() + " because unmatched aggregations"); + return result; + } + + // cost will be minded by caller + result.capable = true; + return result; + } + + private static boolean isMatchedWithJoins(Collection<JoinDesc> joins, IIInstance iiInstance) { + IIDesc iiDesc = iiInstance.getDescriptor(); + List<TableDesc> tables = iiDesc.listTables(); + + List<JoinDesc> cubeJoins = new ArrayList<JoinDesc>(tables.size()); + for (TableDesc tableDesc : tables) { + JoinDesc join = null; + for (LookupDesc lookup : iiDesc.getModel().getLookups()) { + if (lookup.getTable().equalsIgnoreCase(tableDesc.getIdentity())) { + join = lookup.getJoin(); + cubeJoins.add(join); + break; + } + } + } + + for (JoinDesc j : joins) { + // optiq engine can't decide which one is fk or pk + String pTable = j.getPrimaryKeyColumns()[0].getTable(); + String factTable = iiDesc.getModel().getFactTable(); + if (factTable.equals(pTable)) { + j.swapPKFK(); + } + + // check primary key, all PK column should refer to same tale, the Fact Table of iiInstance. + // Using first column's table name to check. + String fTable = j.getForeignKeyColumns()[0].getTable(); + if (!factTable.equals(fTable)) { + logger.info("Fact Table" + factTable + " not matched in join: " + j + " on ii " + iiInstance.getName()); + return false; + } + + // The hashcode() function of JoinDesc has been overwritten, + // which takes into consideration: pk,fk,jointype + if (!cubeJoins.contains(j)) { + logger.info("Query joins don't match on ii " + iiInstance.getName()); + return false; + } + } + return true; + } + + private static Collection<TblColRef> getDimensionColumns(SQLDigest sqlDigest) { + Collection<TblColRef> groupByColumns = sqlDigest.groupbyColumns; + Collection<TblColRef> filterColumns = sqlDigest.filterColumns; + + Collection<TblColRef> dimensionColumns = new HashSet<TblColRef>(); + dimensionColumns.addAll(groupByColumns); + dimensionColumns.addAll(filterColumns); + return dimensionColumns; + } + + private static Set<TblColRef> unmatchedDimensions(Collection<TblColRef> dimensionColumns, IIInstance ii) { + HashSet<TblColRef> result = Sets.newHashSet(dimensionColumns); + result.removeAll(ii.getDescriptor().listAllDimensions()); + return result; + } + + private static Set<FunctionDesc> unmatchedAggregations(Collection<FunctionDesc> aggregations, IIInstance ii) { + HashSet<FunctionDesc> result = Sets.newHashSet(aggregations); + result.removeAll(ii.getDescriptor().listAllFunctions()); + return result; + } + + private static void tryDimensionAsMeasures(Collection<FunctionDesc> unmatchedAggregations, SQLDigest digest, IIInstance ii, CapabilityResult result) { + IIDesc iiDesc = ii.getDescriptor(); + Collection<FunctionDesc> iiFuncs = iiDesc.listAllFunctions(); + + Iterator<FunctionDesc> it = unmatchedAggregations.iterator(); + while (it.hasNext()) { + FunctionDesc functionDesc = it.next(); + + if (iiFuncs.contains(functionDesc)) { + it.remove(); + continue; + } + + // let calcite handle count + if (functionDesc.isCount()) { + it.remove(); + continue; + } + + // calcite can do aggregation from columns on-the-fly + List<TblColRef> neededCols = functionDesc.getParameter().getColRefs(); + if (neededCols.size() > 0 && iiDesc.listAllDimensions().containsAll(neededCols)) { + result.influences.add(new CapabilityResult.DimensionAsMeasure(functionDesc)); + it.remove(); + continue; + } + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java index b372c70..86b0543 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java @@ -29,6 +29,7 @@ import org.apache.kylin.invertedindex.model.IIDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.realization.CapabilityResult; import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.metadata.realization.RealizationType; @@ -265,15 +266,16 @@ public class IIInstance extends RootPersistentEntity implements IRealization { } @Override - public boolean isCapable(SQLDigest digest) { - //TODO: currently II is nearly omnipotent - if (!digest.factTable.equalsIgnoreCase(this.getFactTable())) - return false; - - return true; + public CapabilityResult isCapable(SQLDigest digest) { + CapabilityResult result = IICapabilityChecker.check(this, digest); + if (result.capable) { + result.cost = getCost(digest); + } else { + result.cost = -1; + } + return result; } - @Override public int getCost(SQLDigest digest) { return 0; } http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java index cb5a1cb..f7e70f4 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java @@ -33,7 +33,7 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.restclient.Broadcaster; import org.apache.kylin.common.restclient.CaseInsensitiveStringCache; -import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.dict.DictionaryInfo; import org.apache.kylin.dict.DictionaryManager; import org.apache.kylin.invertedindex.model.IIDesc; http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java index 33ff1b0..77a876c 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java @@ -25,7 +25,7 @@ import java.util.Map; import java.util.TimeZone; import java.util.concurrent.ConcurrentHashMap; -import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.dict.ISegment; import org.apache.kylin.invertedindex.index.TableRecordInfo; import org.apache.kylin.invertedindex.model.IIDesc; @@ -271,7 +271,7 @@ public class IISegment implements Comparable<IISegment>, ISegment { } @Override - public Dictionary<?> getDictionary(TblColRef col) { + public Dictionary<String> getDictionary(TblColRef col) { int index = getTableRecordInfo().findColumn(col); return getTableRecordInfo().dict(index); http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BitMapContainer.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BitMapContainer.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BitMapContainer.java index 2594936..164e2b9 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BitMapContainer.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BitMapContainer.java @@ -25,7 +25,7 @@ import java.util.List; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.common.util.Dictionary; import org.roaringbitmap.RoaringBitmap; /** http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/CompressedValueContainer.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/CompressedValueContainer.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/CompressedValueContainer.java index c58261d..334457c 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/CompressedValueContainer.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/CompressedValueContainer.java @@ -24,7 +24,7 @@ import java.util.Arrays; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.common.util.Dictionary; import com.ning.compress.lzf.LZFDecoder; import com.ning.compress.lzf.LZFEncoder; http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java index baae663..b9f963e 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec; /** http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java index 11ecdf8..c41a70c 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java @@ -21,7 +21,7 @@ package org.apache.kylin.invertedindex.index; import java.io.IOException; import java.util.List; -import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.dict.DictionaryManager; import org.apache.kylin.invertedindex.IISegment; import org.apache.kylin.invertedindex.model.IIDesc; http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java index da78627..0ed58b0 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java @@ -24,7 +24,7 @@ import org.apache.hadoop.io.LongWritable; import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec; -import org.apache.kylin.metadata.model.DataType; +import org.apache.kylin.metadata.datatype.DataType; /** * Created by honma on 11/10/14. http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java index c8597bc..f10712a 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java @@ -207,6 +207,10 @@ public class IIDesc extends RootPersistentEntity { p1.setColRefs(ImmutableList.of(new TblColRef(columnDesc))); f1.setParameter(p1); f1.setReturnType(returnType); + if (f1.isSum() && f1.getReturnDataType().isIntegerFamily()) { + f1.setReturnType("bigint"); + } + measureDesc.setFunction(f1); return measureDesc; } http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java index 4ad38b5..50af8a4 100644 --- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java +++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java @@ -22,7 +22,7 @@ import java.io.IOException; import java.util.List; import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.invertedindex.IIDescManager; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java index 5f75b04..cfa4ba6 100644 --- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java +++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java @@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.common.util.Pair; -import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; import org.apache.kylin.invertedindex.index.BitMapContainer; http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java index 470225e..929408a 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java @@ -22,12 +22,14 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; +import java.util.Map; import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.mr.KylinMapper; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesSplitter; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.SplittedBytes; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -39,11 +41,13 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.job.constant.BatchConstants; import org.apache.kylin.job.hadoop.AbstractHadoopJob; import org.apache.kylin.job.hadoop.hive.CubeJoinedFlatTableDesc; -import org.apache.kylin.metadata.measure.MeasureCodec; +import org.apache.kylin.measure.MeasureCodec; +import org.apache.kylin.measure.MeasureIngester; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.ParameterDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +79,8 @@ public class BaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, Text private int errorRecordCounter; private Text outputKey = new Text(); private Text outputValue = new Text(); + protected MeasureIngester<?>[] aggrIngesters; + protected Map<TblColRef, Dictionary<String>> dictionaryMap; private Object[] measures; private byte[][] keyBytesBuf; private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); @@ -116,6 +122,9 @@ public class BaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, Text int colCount = cubeDesc.getRowkey().getRowKeyColumns().length; keyBytesBuf = new byte[colCount][]; + aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures()); + dictionaryMap = cubeSegment.buildDictionaryMap(); + initNullBytes(); } @@ -153,52 +162,45 @@ public class BaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, Text private void buildValue(SplittedBytes[] splitBuffers) { for (int i = 0; i < measures.length; i++) { - byte[] valueBytes = getValueBytes(splitBuffers, i); - measures[i] = measureCodec.getSerializer(i).valueOf(valueBytes); + measures[i] = buildValueOf(i, splitBuffers); } valueBuf.clear(); measureCodec.encode(measures, valueBuf); } - private byte[] getValueBytes(SplittedBytes[] splitBuffers, int measureIdx) { - MeasureDesc desc = cubeDesc.getMeasures().get(measureIdx); - FunctionDesc func = desc.getFunction(); - ParameterDesc paramDesc = func.getParameter(); - int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx]; - - byte[] result = null; - - // constant - if (flatTableIdx == null) { - result = Bytes.toBytes(paramDesc.getValue()); - } - // column values - else { - // for multiple columns, their values are joined - for (int i = 0; i < flatTableIdx.length; i++) { - SplittedBytes split = splitBuffers[flatTableIdx[i]]; - if (result == null) { - result = Arrays.copyOf(split.value, split.length); - } else { - byte[] newResult = new byte[result.length + split.length]; - System.arraycopy(result, 0, newResult, 0, result.length); - System.arraycopy(split.value, 0, newResult, result.length, split.length); - result = newResult; - } + private Object buildValueOf(int idxOfMeasure, SplittedBytes[] splitBuffers) { + MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure); + FunctionDesc function = measure.getFunction(); + int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure]; + + int paramCount = function.getParameterCount(); + String[] inputToMeasure = new String[paramCount]; + + // pick up parameter values + ParameterDesc param = function.getParameter(); + int colParamIdx = 0; // index among parameters of column type + for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) { + String value; + if (function.isCount()) { + value = "1"; + } else if (param.isColumnType()) { + value = getCell(colIdxOnFlatTable[colParamIdx++], splitBuffers); + } else { + value = param.getValue(); } + inputToMeasure[i] = value; } - if (func.isCount() || func.isHolisticCountDistinct()) { - // note for holistic count distinct, this value will be ignored - result = ONE; - } - - if (isNull(result)) { - result = null; - } + return aggrIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap); + } - return result; + private String getCell(int i, SplittedBytes[] splitBuffers) { + byte[] bytes = Arrays.copyOf(splitBuffers[i].value, splitBuffers[i].length); + if (isNull(bytes)) + return null; + else + return Bytes.toString(bytes); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java index 36bf80c..c19e54b 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java @@ -37,7 +37,7 @@ import org.apache.kylin.cube.model.HBaseColumnDesc; import org.apache.kylin.cube.model.HBaseColumnFamilyDesc; import org.apache.kylin.job.constant.BatchConstants; import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.metadata.measure.MeasureCodec; +import org.apache.kylin.measure.MeasureCodec; import org.apache.kylin.metadata.model.MeasureDesc; import com.google.common.collect.Lists; @@ -93,7 +93,7 @@ public class CubeHFileMapper extends KylinMapper<Text, Text, ImmutableBytesWrita } else { // normal (complex) case that distributes measures to multiple // HBase columns - inputCodec.decode(value, inputMeasures); + inputCodec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), inputMeasures); for (int i = 0; i < n; i++) { outputValue = keyValueCreators.get(i).create(key, inputMeasures); http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java index a74f2a1..bff9e3a 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java @@ -161,9 +161,8 @@ public class CuboidJob extends AbstractHadoopJob { // number of reduce tasks int numReduceTasks = (int) Math.round(totalReduceInputMB / perReduceInputMB * reduceCountRatio); - // adjust reducer number for cube which has DISTINCT_COUNT measures for - // better performance - if (cubeDesc.hasHolisticCountDistinctMeasures()) { + // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance + if (cubeDesc.hasMemoryHungryMeasures()) { numReduceTasks = numReduceTasks * 4; } http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java index bcb4b52..4527f30 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java @@ -30,8 +30,8 @@ import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.job.constant.BatchConstants; import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.metadata.measure.MeasureAggregators; -import org.apache.kylin.metadata.measure.MeasureCodec; +import org.apache.kylin.measure.MeasureAggregators; +import org.apache.kylin.measure.MeasureCodec; import org.apache.kylin.metadata.model.MeasureDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +81,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> { aggs.reset(); for (Text value : values) { - codec.decode(value, input); + codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input); aggs.aggregate(input); } aggs.collectStates(result); http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java index 2516745..2528e07 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java @@ -19,17 +19,22 @@ package org.apache.kylin.job.hadoop.cube; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.collect.Lists; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.mr.KylinMapper; import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.SplittedBytes; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -38,10 +43,14 @@ import org.apache.kylin.cube.common.RowKeySplitter; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.dict.DictionaryManager; import org.apache.kylin.job.constant.BatchConstants; import org.apache.kylin.job.hadoop.AbstractHadoopJob; +import org.apache.kylin.measure.MeasureCodec; +import org.apache.kylin.measure.MeasureIngester; +import org.apache.kylin.measure.MeasureType; +import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; @@ -60,6 +69,17 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { private CubeSegment sourceCubeSegment;// Must be unique during a mapper's // life cycle + + // for re-encode measures that use dictionary + private List<Pair<Integer, MeasureIngester>> dictMeasures; + private Map<TblColRef, Dictionary<String>> oldDicts; + private Map<TblColRef, Dictionary<String>> newDicts; + private List<MeasureDesc> measureDescs; + private MeasureCodec codec; + private Object[] measureObjs; + private ByteBuffer valueBuf; + private Text outputValue; + private Text outputKey = new Text(); private byte[] newKeyBuf; @@ -133,6 +153,26 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { System.out.println(sourceCubeSegment); this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255); + + + measureDescs = cubeDesc.getMeasures(); + codec = new MeasureCodec(measureDescs); + measureObjs = new Object[measureDescs.size()]; + valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); + outputValue = new Text(); + + dictMeasures = Lists.newArrayList(); + for (int i = 0; i < measureDescs.size(); i++) { + MeasureDesc measureDesc = measureDescs.get(i); + MeasureType measureType = measureDesc.getFunction().getMeasureType(); + if (measureType.getColumnsNeedDictionary(measureDesc.getFunction()).isEmpty() == false) { + dictMeasures.add(Pair.newPair(i, measureType.newIngester())); + } + } + if (dictMeasures.size() > 0) { + oldDicts = sourceCubeSegment.buildDictionaryMap(); + newDicts = mergedCubeSegment.buildDictionaryMap(); + } } @Override @@ -187,6 +227,21 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset); outputKey.set(newKey, 0, newKey.length); + + // re-encode measures if dictionary is used + if (dictMeasures.size() > 0) { + codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs); + for (Pair<Integer, MeasureIngester> pair : dictMeasures) { + int i = pair.getFirst(); + MeasureIngester ingester = pair.getSecond(); + measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts); + } + valueBuf.clear(); + codec.encode(measureObjs, valueBuf); + outputValue.set(valueBuf.array(), 0, valueBuf.position()); + value = outputValue; + } + context.write(outputKey, value); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java deleted file mode 100644 index 35b4662..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java +++ /dev/null @@ -1,346 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; - -import org.apache.hadoop.io.Text; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.mr.KylinMapper; -import org.apache.kylin.common.util.Array; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.BytesSplitter; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.common.util.SplittedBytes; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; -import org.apache.kylin.cube.kv.RowConstants; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.DimensionDesc; -import org.apache.kylin.dict.lookup.HiveTable; -import org.apache.kylin.dict.lookup.LookupBytesTable; -import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.metadata.MetadataManager; -import org.apache.kylin.metadata.measure.MeasureCodec; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.JoinDesc; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.ParameterDesc; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author George Song (ysong1),honma - */ -public class NewBaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, Text> { - - private static final Logger logger = LoggerFactory.getLogger(NewBaseCuboidMapper.class); - - private String cubeName; - private String segmentName; - private Cuboid baseCuboid; - private CubeInstance cube; - private CubeSegment cubeSegment; - - private CubeDesc cubeDesc; - private MetadataManager metadataManager; - private TableDesc factTableDesc; - - private boolean byteRowDelimiterInferred = false; - private byte byteRowDelimiter; - - private int counter; - private Text outputKey = new Text(); - private Text outputValue = new Text(); - private Object[] measures; - private byte[][] keyBytesBuf; - private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - - private BytesSplitter bytesSplitter; - private AbstractRowKeyEncoder rowKeyEncoder; - private MeasureCodec measureCodec; - - // deal with table join - private HashMap<String, LookupBytesTable> lookupTables;// name -> table - private LinkedList<TableJoin> tableJoins; - private LinkedList<Pair<Integer, Integer>> factTblColAsRowKey;// similar as - // TableJoin.dimTblColAsRowKey - private int[][] measureColumnIndice; - private byte[] nullValue; - - private class TableJoin { - public LinkedList<Integer> fkIndice;// zero-based join columns on fact - // table - public String lookupTableName; - public String joinType; - - // Pair.first -> zero-based column index in lookup table - // Pair.second -> zero based row key index - public LinkedList<Pair<Integer, Integer>> dimTblColAsRowKey; - - private TableJoin(String joinType, LinkedList<Integer> fkIndice, String lookupTableName, LinkedList<Pair<Integer, Integer>> dimTblColAsRowKey) { - this.joinType = joinType; - this.fkIndice = fkIndice; - this.lookupTableName = lookupTableName; - this.dimTblColAsRowKey = dimTblColAsRowKey; - } - } - - @Override - protected void setup(Context context) throws IOException { - super.publishConfiguration(context.getConfiguration()); - - cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase(); - segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME); - - KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration()); - - metadataManager = MetadataManager.getInstance(config); - cube = CubeManager.getInstance(config).getCube(cubeName); - cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW); - cubeDesc = cube.getDescriptor(); - factTableDesc = metadataManager.getTableDesc(cubeDesc.getFactTable()); - - long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); - - // intermediateTableDesc = new - // JoinedFlatTableDesc(cube.getDescriptor()); - - rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid); - - measureCodec = new MeasureCodec(cubeDesc.getMeasures()); - measures = new Object[cubeDesc.getMeasures().size()]; - - int colCount = cubeDesc.getRowkey().getRowKeyColumns().length; - keyBytesBuf = new byte[colCount][]; - - bytesSplitter = new BytesSplitter(factTableDesc.getColumns().length, 4096); - - nullValue = new byte[] { (byte) '\\', (byte) 'N' };// As in Hive, null - // value is - // represented by \N - - prepareJoins(); - prepareMetrics(); - } - - private void prepareJoins() throws IOException { - this.lookupTables = new HashMap<String, LookupBytesTable>(); - this.tableJoins = new LinkedList<TableJoin>(); - this.factTblColAsRowKey = new LinkedList<Pair<Integer, Integer>>(); - - for (DimensionDesc dim : cubeDesc.getDimensions()) { - JoinDesc join = dim.getJoin(); - if (join != null) { - String joinType = join.getType().toUpperCase(); - String lookupTableName = dim.getTable(); - - // load lookup tables - if (!lookupTables.containsKey(lookupTableName)) { - HiveTable htable = new HiveTable(metadataManager, lookupTableName); - LookupBytesTable btable = new LookupBytesTable(metadataManager.getTableDesc(lookupTableName), join.getPrimaryKey(), htable); - lookupTables.put(lookupTableName, btable); - } - - // create join infos - LinkedList<Integer> fkIndice = new LinkedList<Integer>(); - for (TblColRef colRef : join.getForeignKeyColumns()) { - fkIndice.add(colRef.getColumn().getZeroBasedIndex()); - } - this.tableJoins.add(new TableJoin(joinType, fkIndice, lookupTableName, this.findColumnRowKeyRelationships(dim))); - - } else { - - this.factTblColAsRowKey.addAll(this.findColumnRowKeyRelationships(dim)); - } - } - - // put composite keys joins ahead of single key joins - Collections.sort(tableJoins, new Comparator<TableJoin>() { - @Override - public int compare(TableJoin o1, TableJoin o2) { - return Integer.valueOf(o2.fkIndice.size()).compareTo(Integer.valueOf(o1.fkIndice.size())); - } - }); - } - - private LinkedList<Pair<Integer, Integer>> findColumnRowKeyRelationships(DimensionDesc dim) { - LinkedList<Pair<Integer, Integer>> dimTblColAsRowKey = new LinkedList<Pair<Integer, Integer>>(); - for (TblColRef colRef : dim.getColumnRefs()) { - int dimTableIndex = colRef.getColumn().getZeroBasedIndex(); - int rowKeyIndex = cubeDesc.getRowkey().getRowKeyIndexByColumnName(colRef.getName()); - dimTblColAsRowKey.add(new Pair<Integer, Integer>(dimTableIndex, rowKeyIndex)); - } - return dimTblColAsRowKey; - } - - private void prepareMetrics() { - List<MeasureDesc> measures = cubeDesc.getMeasures(); - int measureSize = measures.size(); - measureColumnIndice = new int[measureSize][]; - for (int i = 0; i < measureSize; i++) { - FunctionDesc func = measures.get(i).getFunction(); - List<TblColRef> colRefs = func.getParameter().getColRefs(); - if (colRefs == null) { - measureColumnIndice[i] = null; - } else { - measureColumnIndice[i] = new int[colRefs.size()]; - for (int j = 0; j < colRefs.size(); j++) { - TblColRef c = colRefs.get(j); - int factTblIdx = factTableDesc.findColumnByName(c.getName()).getZeroBasedIndex(); - measureColumnIndice[i][j] = factTblIdx; - } - } - } - } - - private byte[] trimSplitBuffer(SplittedBytes splittedBytes) { - return Arrays.copyOf(splittedBytes.value, splittedBytes.length); - } - - private byte[] buildKey(SplittedBytes[] splitBuffers) { - - int filledDimension = 0;// debug - - // join lookup tables, and fill into RowKey the columns in lookup table - for (TableJoin tableJoin : this.tableJoins) { - String dimTblName = tableJoin.lookupTableName; - LookupBytesTable dimTbl = this.lookupTables.get(dimTblName); - ByteArray[] rawKey = new ByteArray[tableJoin.fkIndice.size()]; - for (int i = 0; i < tableJoin.fkIndice.size(); ++i) { - rawKey[i] = new ByteArray(trimSplitBuffer(splitBuffers[tableJoin.fkIndice.get(i)])); - } - Array<ByteArray> key = new Array<ByteArray>(rawKey); - ByteArray[] dimRow = dimTbl.getRow(key); - if (dimRow == null) { - if (tableJoin.joinType.equalsIgnoreCase("INNER")) { - return null; - } else if (tableJoin.joinType.equalsIgnoreCase("LEFT")) { - for (Pair<Integer, Integer> relation : tableJoin.dimTblColAsRowKey) { - keyBytesBuf[relation.getSecond()] = nullValue; - filledDimension++; - } - } - } else { - for (Pair<Integer, Integer> relation : tableJoin.dimTblColAsRowKey) { - keyBytesBuf[relation.getSecond()] = dimRow[relation.getFirst()].data; - filledDimension++; - } - } - } - - // fill into RowKey the columns in fact table - for (Pair<Integer, Integer> relation : this.factTblColAsRowKey) { - keyBytesBuf[relation.getSecond()] = trimSplitBuffer(splitBuffers[relation.getFirst()]); - filledDimension++; - } - - assert filledDimension == keyBytesBuf.length; - - // all the row key slots(keyBytesBuf) should be complete now - return rowKeyEncoder.encode(keyBytesBuf); - } - - private void buildValue(SplittedBytes[] splitBuffers) { - - for (int i = 0; i < measures.length; i++) { - byte[] valueBytes = getValueBytes(splitBuffers, i); - measures[i] = measureCodec.getSerializer(i).valueOf(valueBytes); - } - - valueBuf.clear(); - measureCodec.encode(measures, valueBuf); - } - - private byte[] getValueBytes(SplittedBytes[] splitBuffers, int measureIdx) { - MeasureDesc desc = cubeDesc.getMeasures().get(measureIdx); - ParameterDesc paramDesc = desc.getFunction().getParameter(); - int[] flatTableIdx = this.measureColumnIndice[measureIdx]; - - byte[] result = null; - - // constant - if (flatTableIdx == null) { - result = Bytes.toBytes(paramDesc.getValue()); - } - // column values - else { - for (int i = 0; i < flatTableIdx.length; i++) { - SplittedBytes split = splitBuffers[flatTableIdx[i]]; - result = Arrays.copyOf(split.value, split.length); - } - } - - if (desc.getFunction().isCount()) { - result = Bytes.toBytes("1"); - } - - return result; - } - - @Override - public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException { - // combining the hive table flattening logic into base cuboid building. - // the input of this mapper is the fact table rows - - counter++; - if (counter % BatchConstants.COUNTER_MAX == 0) { - logger.info("Handled " + counter + " records!"); - } - - if (!byteRowDelimiterInferred) - byteRowDelimiter = bytesSplitter.inferByteRowDelimiter(value.getBytes(), value.getLength(), factTableDesc.getColumns().length); - - bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter); - - try { - byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers()); - if (rowKey == null) - return;// skip this fact table row - - outputKey.set(rowKey, 0, rowKey.length); - - buildValue(bytesSplitter.getSplitBuffers()); - outputValue.set(valueBuf.array(), 0, valueBuf.position()); - - context.write(outputKey, outputValue); - - } catch (Throwable t) { - logger.error("", t); - context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Error records").increment(1L); - return; - } - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java ---------------------------------------------------------------------- diff --git a/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java b/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java index c9988fc..fa7e51b 100644 --- a/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java +++ b/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java @@ -45,7 +45,7 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.DimensionDesc; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.ColumnDesc; -import org.apache.kylin.metadata.model.DataType; +import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.JoinDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java ---------------------------------------------------------------------- diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java index f100490..58d093a 100644 --- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java +++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.List; import org.apache.commons.io.FileUtils; @@ -36,7 +37,7 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.kv.RowKeyDecoder; import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.metadata.measure.MeasureCodec; +import org.apache.kylin.measure.MeasureCodec; import org.apache.kylin.metadata.model.MeasureDesc; import org.junit.After; import org.junit.Before; @@ -100,7 +101,7 @@ public class BaseCuboidMapperTest extends LocalFileMetadataTestCase { private void verifyMeasures(List<MeasureDesc> measures, Text valueBytes, String... valueStr) { MeasureCodec codec = new MeasureCodec(measures); Object[] values = new Object[measures.size()]; - codec.decode(valueBytes, values); + codec.decode(ByteBuffer.wrap(valueBytes.getBytes()), values); assertTrue(new BigDecimal(valueStr[0]).equals(values[0])); assertTrue(new BigDecimal(valueStr[1]).equals(values[1])); assertTrue(new BigDecimal(valueStr[2]).equals(values[2])); @@ -138,6 +139,6 @@ public class BaseCuboidMapperTest extends LocalFileMetadataTestCase { assertEquals(511, Bytes.toLong(cuboidId)); assertEquals(22, restKey.length); - verifyMeasures(cube.getDescriptor().getMeasures(), result.get(0).getSecond(), "0", "0", "0", "1","22"); + verifyMeasures(cube.getDescriptor().getMeasures(), result.get(0).getSecond(), "0", "0", "0", "1", "22"); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper2Test.java ---------------------------------------------------------------------- diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper2Test.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper2Test.java index 867faa6..c9b7eba 100644 --- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper2Test.java +++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper2Test.java @@ -34,7 +34,7 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.metadata.measure.MeasureCodec; +import org.apache.kylin.measure.MeasureCodec; import org.junit.After; import org.junit.Before; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeReducerTest.java ---------------------------------------------------------------------- diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeReducerTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeReducerTest.java index 837f759..16cfa89 100644 --- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeReducerTest.java +++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeReducerTest.java @@ -37,7 +37,7 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.metadata.measure.MeasureCodec; +import org.apache.kylin.measure.MeasureCodec; import org.junit.After; import org.junit.Before; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java ---------------------------------------------------------------------- diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java index 9a1fdfb..ea5c163 100644 --- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java +++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java @@ -33,7 +33,7 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.dict.DictionaryGenerator; import org.apache.kylin.dict.DictionaryInfo; import org.apache.kylin.dict.DictionaryManager; http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/test/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapperTest.java ---------------------------------------------------------------------- diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapperTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapperTest.java index 7ce6ee9..d60ec67 100644 --- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapperTest.java +++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapperTest.java @@ -24,8 +24,11 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; +import java.util.Collection; import java.util.List; +import com.google.common.base.Function; +import com.google.common.collect.Collections2; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver; @@ -37,6 +40,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import javax.annotation.Nullable; + public class NDCuboidMapperTest extends LocalFileMetadataTestCase { MapReduceDriver<Text, Text, Text, Text, Text, Text> mapReduceDriver; @@ -81,7 +86,17 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase { byte[] resultValue = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1, 1 }; Pair<Text, Text> output1 = new Pair<Text, Text>(new Text(resultKey), new Text(resultValue)); - assertTrue(result.contains(output1)); + + //As we will truncate decimal(KYLIN-766), value will no longer equals to resultValue + Collection<Text> keys = Collections2.transform(result, new Function<Pair<Text, Text>, Text>() { + @Nullable + @Override + public Text apply(Pair<Text, Text> input) { + return input.getFirst(); + } + }); + assertTrue(keys.contains(output1.getFirst())); + assertTrue(!result.contains(output1)); long[] keySet = new long[result.size()]; http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java b/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java new file mode 100644 index 0000000..7b74225 --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure; + +import java.io.Serializable; + +import org.apache.kylin.metadata.datatype.DataType; + +/** + */ +@SuppressWarnings("serial") +abstract public class MeasureAggregator<V> implements Serializable { + + public static MeasureAggregator<?> create(String funcName, DataType dataType) { + return MeasureTypeFactory.create(funcName, dataType).newAggregator(); + } + + public static int guessBigDecimalMemBytes() { + // 116 returned by AggregationCacheMemSizeTest + return 8 // aggregator obj shell + + 8 // ref to BigDecimal + + 8 // BigDecimal obj shell + + 100; // guess of BigDecimal internal + } + + public static int guessDoubleMemBytes() { + // 29 to 44 returned by AggregationCacheMemSizeTest + return 44; + /* + return 8 // aggregator obj shell + + 8 // ref to DoubleWritable + + 8 // DoubleWritable obj shell + + 8; // size of double + */ + } + + public static int guessLongMemBytes() { + // 29 to 44 returned by AggregationCacheMemSizeTest + return 44; + /* + return 8 // aggregator obj shell + + 8 // ref to LongWritable + + 8 // LongWritable obj shell + + 8; // size of long + */ + } + + // ============================================================================ + + @SuppressWarnings("rawtypes") + public void setDependentAggregator(MeasureAggregator agg) { + } + + abstract public void reset(); + + abstract public void aggregate(V value); + + abstract public V getState(); + + // get an estimate of memory consumption UPPER BOUND + abstract public int getMemBytesEstimate(); +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java b/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java new file mode 100644 index 0000000..12832ff --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure; + +import java.io.Serializable; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.MeasureDesc; + +/** + */ +@SuppressWarnings({ "rawtypes", "unchecked", "serial" }) +public class MeasureAggregators implements Serializable { + + private final MeasureAggregator[] aggs; + private final int descLength; + + public MeasureAggregators(Collection<MeasureDesc> measureDescs) { + this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()])); + } + + public MeasureAggregators(MeasureDesc... measureDescs) { + descLength = measureDescs.length; + aggs = new MeasureAggregator[descLength]; + + Map<String, Integer> measureIndexMap = new HashMap<String, Integer>(); + for (int i = 0; i < descLength; i++) { + FunctionDesc func = measureDescs[i].getFunction(); + aggs[i] = func.getMeasureType().newAggregator(); + measureIndexMap.put(measureDescs[i].getName(), i); + } + // fill back dependent aggregator + for (int i = 0; i < descLength; i++) { + String depMsrRef = measureDescs[i].getDependentMeasureRef(); + if (depMsrRef != null) { + int index = measureIndexMap.get(depMsrRef); + aggs[i].setDependentAggregator(aggs[index]); + } + } + } + + public void reset() { + for (int i = 0; i < aggs.length; i++) { + aggs[i].reset(); + } + } + + public void aggregate(Object[] values) { + assert values.length == descLength; + + for (int i = 0; i < descLength; i++) { + aggs[i].aggregate(values[i]); + } + } + + public void collectStates(Object[] states) { + for (int i = 0; i < descLength; i++) { + states[i] = aggs[i].getState(); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java b/metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java new file mode 100644 index 0000000..6209079 --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure; + +import java.nio.ByteBuffer; +import java.util.Collection; + +import org.apache.kylin.metadata.datatype.DataTypeSerializer; +import org.apache.kylin.metadata.model.MeasureDesc; + +/** + * @author yangli9 + * + */ +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class MeasureCodec { + + int nMeasures; + DataTypeSerializer[] serializers; + + public MeasureCodec(Collection<MeasureDesc> measureDescs) { + this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()])); + } + + public MeasureCodec(MeasureDesc... measureDescs) { + String[] dataTypes = new String[measureDescs.length]; + for (int i = 0; i < dataTypes.length; i++) { + dataTypes[i] = measureDescs[i].getFunction().getReturnType(); + } + init(dataTypes); + } + + public MeasureCodec(String... dataTypes) { + init(dataTypes); + } + + private void init(String[] dataTypes) { + nMeasures = dataTypes.length; + serializers = new DataTypeSerializer[nMeasures]; + + for (int i = 0; i < nMeasures; i++) { + serializers[i] = DataTypeSerializer.create(dataTypes[i]); + } + } + + public DataTypeSerializer getSerializer(int idx) { + return serializers[idx]; + } + + public void decode(ByteBuffer buf, Object[] result) { + assert result.length == nMeasures; + for (int i = 0; i < nMeasures; i++) { + result[i] = serializers[i].deserialize(buf); + } + } + + public void encode(Object[] values, ByteBuffer out) { + assert values.length == nMeasures; + for (int i = 0; i < nMeasures; i++) { + serializers[i].serialize(values[i], out); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java b/metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java new file mode 100644 index 0000000..0076252 --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure; + +import java.util.Collection; +import java.util.Map; + +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.TblColRef; + +abstract public class MeasureIngester<V> { + + public static MeasureIngester<?> create(MeasureDesc measure) { + return measure.getFunction().getMeasureType().newIngester(); + } + + public static MeasureIngester<?>[] create(Collection<MeasureDesc> measures) { + MeasureIngester<?>[] result = new MeasureIngester<?>[measures.size()]; + int i = 0; + for (MeasureDesc measure : measures) { + result[i++] = create(measure); + } + return result; + } + + abstract public V valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap); + + public V reEncodeDictionary(V value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts) { + throw new UnsupportedOperationException(); + } +}