http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java index fd86138..f225fe2 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java @@ -59,7 +59,6 @@ import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.ParameterDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.query.sqlfunc.HLLDistinctCountAggFunc; import com.google.common.base.Preconditions; @@ -74,7 +73,6 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { AGGR_FUNC_MAP.put("$SUM0", "SUM"); AGGR_FUNC_MAP.put("COUNT", "COUNT"); AGGR_FUNC_MAP.put("COUNT_DISTINCT", "COUNT_DISTINCT"); - AGGR_FUNC_MAP.put("HLL_COUNT", "COUNT_DISTINCT"); AGGR_FUNC_MAP.put("MAX", "MAX"); AGGR_FUNC_MAP.put("MIN", "MIN"); } @@ -301,10 +299,10 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { // rebuild function RelDataType fieldType = aggCall.getType(); SqlAggFunction newAgg = aggCall.getAggregation(); - if (func.isCountDistinct()) { - newAgg = createHyperLogLogAggFunction(fieldType); - } else if (func.isCount()) { + if (func.isCount()) { newAgg = SqlStdOperatorTable.SUM0; + } else if (func.getMeasureType().getRewriteCalciteAggrFunctionClass() != null) { + newAgg = createCustomAggFunction(func.getExpression(), fieldType, func.getMeasureType().getRewriteCalciteAggrFunctionClass()); } // rebuild aggregate call @@ -312,10 +310,10 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { return newAggCall; } - private SqlAggFunction createHyperLogLogAggFunction(RelDataType returnType) { + private SqlAggFunction createCustomAggFunction(String funcName, RelDataType returnType, Class<?> customAggFuncClz) { RelDataTypeFactory typeFactory = getCluster().getTypeFactory(); - SqlIdentifier sqlIdentifier = new SqlIdentifier("HLL_COUNT", new SqlParserPos(1, 1)); - AggregateFunction aggFunction = AggregateFunctionImpl.create(HLLDistinctCountAggFunc.class); + SqlIdentifier sqlIdentifier = new SqlIdentifier(funcName, new SqlParserPos(1, 1)); + AggregateFunction aggFunction = AggregateFunctionImpl.create(customAggFuncClz); List<RelDataType> argTypes = new ArrayList<RelDataType>(); List<SqlTypeFamily> typeFamilies = new ArrayList<SqlTypeFamily>(); for (FunctionParameter o : aggFunction.getParameters()) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/Candidate.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/routing/Candidate.java b/query/src/main/java/org/apache/kylin/query/routing/Candidate.java new file mode 100644 index 0000000..22608a0 --- /dev/null +++ b/query/src/main/java/org/apache/kylin/query/routing/Candidate.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.query.routing; + +import java.util.Map; + +import org.apache.kylin.metadata.realization.CapabilityResult; +import org.apache.kylin.metadata.realization.IRealization; +import org.apache.kylin.metadata.realization.RealizationType; +import org.apache.kylin.metadata.realization.SQLDigest; +import org.apache.kylin.storage.hybrid.HybridInstance; + +import com.google.common.collect.Maps; + +public class Candidate implements Comparable<Candidate> { + + static final Map<RealizationType, Integer> PRIORITIES = Maps.newHashMap(); + + static { + PRIORITIES.put(RealizationType.HYBRID, 0); + PRIORITIES.put(RealizationType.CUBE, 0); + PRIORITIES.put(RealizationType.INVERTED_INDEX, 1); + } + + /** for test only */ + public static void setPriorities(Map<RealizationType, Integer> priorities) { + PRIORITIES.clear(); + PRIORITIES.putAll(priorities); + } + + // ============================================================================ + + IRealization realization; + SQLDigest sqlDigest; + int priority; + CapabilityResult capability; + + public Candidate(IRealization realization, SQLDigest sqlDigest) { + this.realization = realization; + this.sqlDigest = sqlDigest; + this.priority = PRIORITIES.get(realization.getType()); + } + + public IRealization getRealization() { + return realization; + } + + public SQLDigest getSqlDigest() { + return sqlDigest; + } + + public int getPriority() { + return priority; + } + + public CapabilityResult getCapability() { + return capability; + } + + public void setCapability(CapabilityResult capability) { + this.capability = capability; + } + + @Override + public int compareTo(Candidate o) { + int comp = this.priority - o.priority; + if (comp != 0) { + return comp; + } + + comp = this.capability.cost - o.capability.cost; + if (comp != 0) { + return comp; + } + + if (this.realization instanceof HybridInstance) + return -1; + else if (o.realization instanceof HybridInstance) + return 1; + else + return 0; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java b/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java index 7fdc725..7493e08 100644 --- a/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java +++ b/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java @@ -6,23 +6,30 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - */ +*/ package org.apache.kylin.query.routing; import java.util.List; +import java.util.Set; import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.project.ProjectManager; +import org.apache.kylin.metadata.realization.CapabilityResult; +import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence; +import org.apache.kylin.metadata.realization.CapabilityResult.DimensionAsMeasure; import org.apache.kylin.metadata.realization.IRealization; +import org.apache.kylin.metadata.realization.SQLDigest; import org.apache.kylin.query.relnode.OLAPContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,22 +46,54 @@ public class QueryRouter { public static IRealization selectRealization(OLAPContext olapContext) throws NoRealizationFoundException { ProjectManager prjMgr = ProjectManager.getInstance(olapContext.olapSchema.getConfig()); + logger.info("The project manager's reference is " + prjMgr); String factTableName = olapContext.firstTableScan.getTableName(); String projectName = olapContext.olapSchema.getProjectName(); - List<IRealization> realizations = Lists.newArrayList(prjMgr.getRealizationsByTable(projectName, factTableName)); - logger.info("Find candidates by table " + factTableName + " and project=" + projectName + " : " + StringUtils.join(realizations, ",")); + Set<IRealization> realizations = prjMgr.getRealizationsByTable(projectName, factTableName); + SQLDigest sqlDigest = olapContext.getSQLDigest(); + + List<Candidate> candidates = Lists.newArrayListWithCapacity(realizations.size()); + for (IRealization real : realizations) { + if (real.isReady()) + candidates.add(new Candidate(real, sqlDigest)); + } - //rule based realization selection, rules might reorder realizations or remove specific realization - RoutingRule.applyRules(realizations, olapContext); + logger.info("Find candidates by table " + factTableName + " and project=" + projectName + " : " + StringUtils.join(candidates, ",")); - if (realizations.size() == 0) { - throw new NoRealizationFoundException("Can't find any realization. Please confirm with providers. SQL digest: " + olapContext.getSQLDigest().toString()); + // rule based realization selection, rules might reorder realizations or remove specific realization + RoutingRule.applyRules(candidates); + + if (candidates.size() == 0) { + throw new NoRealizationFoundException("Can't find any realization. Please confirm with providers. SQL digest: " + sqlDigest.toString()); } + Candidate chosen = candidates.get(0); + adjustForDimensionAsMeasure(chosen, olapContext); + logger.info("The realizations remaining: "); - logger.info(RoutingRule.getPrintableText(realizations)); - logger.info("The realization being chosen: " + realizations.get(0).getName()); + logger.info(RoutingRule.getPrintableText(candidates)); + logger.info("The realization being chosen: " + chosen.realization.getName()); - return realizations.get(0); + return chosen.realization; } + + private static void adjustForDimensionAsMeasure(Candidate chosen, OLAPContext olapContext) { + CapabilityResult capability = chosen.getCapability(); + for (CapabilityInfluence inf : capability.influences) { + // convert the metric to dimension + if (inf instanceof DimensionAsMeasure) { + FunctionDesc functionDesc = ((DimensionAsMeasure) inf).getMeasureFunction(); + functionDesc.setDimensionAsMetric(true); + olapContext.rewriteFields.remove(functionDesc.getRewriteFieldName()); + for (TblColRef col : functionDesc.getParameter().getColRefs()) { + if (col != null) { + olapContext.metricsColumns.remove(col); + olapContext.groupByColumns.add(col); + } + } + logger.info("Adjust DimensionAsMeasure for " + functionDesc); + } + } + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java index 230950e..715f6d1 100644 --- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java +++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java @@ -6,15 +6,15 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - */ +*/ package org.apache.kylin.query.routing; @@ -23,17 +23,14 @@ import java.util.List; import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.metadata.realization.RealizationType; -import org.apache.kylin.query.relnode.OLAPContext; -import org.apache.kylin.query.routing.RoutingRules.AdjustForWeeklyMatchedRealization; -import org.apache.kylin.query.routing.RoutingRules.RealizationSortRule; -import org.apache.kylin.query.routing.RoutingRules.RemoveUncapableRealizationsRule; +import org.apache.kylin.query.routing.rules.RealizationSortRule; +import org.apache.kylin.query.routing.rules.RemoveUncapableRealizationsRule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; /** - * Created by Hongbin Ma(Binmahone) on 1/5/15. */ public abstract class RoutingRule { private static final Logger logger = LoggerFactory.getLogger(QueryRouter.class); @@ -45,26 +42,23 @@ public abstract class RoutingRule { static { rules.add(new RemoveUncapableRealizationsRule()); rules.add(new RealizationSortRule()); - rules.add(new AdjustForWeeklyMatchedRealization());//this rule might modify olapcontext content, better put it at last } - public static void applyRules(List<IRealization> realizations, OLAPContext olapContext) { + public static void applyRules(List<Candidate> candidates) { for (RoutingRule rule : rules) { - logger.info("Initial realizations order:"); - logger.info(getPrintableText(realizations)); - logger.info("Applying rule " + rule); - - rule.apply(realizations, olapContext); - - logger.info(getPrintableText(realizations)); + logger.info("Realizations order before: " + getPrintableText(candidates)); + logger.info("Applying rule : " + rule); + rule.apply(candidates); + logger.info("Realizations order after: " + getPrintableText(candidates)); logger.info("==================================================="); } } - public static String getPrintableText(List<IRealization> realizations) { + public static String getPrintableText(List<Candidate> candidates) { StringBuffer sb = new StringBuffer(); sb.append("["); - for (IRealization r : realizations) { + for (Candidate candidate : candidates) { + IRealization r = candidate.realization; sb.append(r.getName()); sb.append(","); } @@ -112,6 +106,6 @@ public abstract class RoutingRule { return this.getClass().toString(); } - public abstract void apply(List<IRealization> realizations, OLAPContext olapContext); + public abstract void apply(List<Candidate> candidates); } http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java deleted file mode 100644 index f457c7d..0000000 --- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.query.routing.RoutingRules; - -import java.util.Collection; -import java.util.Iterator; -import java.util.List; - -import org.apache.kylin.cube.CubeCapabilityChecker; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.model.IIDesc; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.realization.IRealization; -import org.apache.kylin.query.relnode.OLAPContext; -import org.apache.kylin.query.routing.RoutingRule; -import org.apache.kylin.storage.hybrid.HybridInstance; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - */ -public class AdjustForWeeklyMatchedRealization extends RoutingRule { - private static final Logger logger = LoggerFactory.getLogger(AdjustForWeeklyMatchedRealization.class); - - @Override - public void apply(List<IRealization> realizations, OLAPContext olapContext) { - if (realizations.size() > 0) { - IRealization first = realizations.get(0); - - if (first instanceof HybridInstance) { - HybridInstance hybrid = (HybridInstance) first; - - if (hybrid.getRealizations()[0] instanceof CubeInstance) - first = hybrid.getRealizations()[0]; - } - - if (first instanceof CubeInstance) { - CubeInstance cube = (CubeInstance) first; - adjustOLAPContextIfNecessary(cube, olapContext); - } - - if (first instanceof IIInstance) { - IIInstance ii = (IIInstance) first; - adjustOLAPContextIfNecessary(ii, olapContext); - } - } - } - - private static void adjustOLAPContextIfNecessary(IIInstance ii, OLAPContext olapContext) { - IIDesc iiDesc = ii.getDescriptor(); - Collection<FunctionDesc> iiFuncs = iiDesc.listAllFunctions(); - convertAggregationToDimension(olapContext, iiFuncs, iiDesc.getFactTableName()); - } - - private static void adjustOLAPContextIfNecessary(CubeInstance cube, OLAPContext olapContext) { - if (CubeCapabilityChecker.check(cube, olapContext.getSQLDigest(), false)) - return; - - CubeDesc cubeDesc = cube.getDescriptor(); - Collection<FunctionDesc> cubeFuncs = cubeDesc.listAllFunctions(); - convertAggregationToDimension(olapContext, cubeFuncs, cubeDesc.getFactTable()); - } - - private static void convertAggregationToDimension(OLAPContext olapContext, Collection<FunctionDesc> availableAggregations, String factTableName) { - Iterator<FunctionDesc> it = olapContext.aggregations.iterator(); - while (it.hasNext()) { - FunctionDesc functionDesc = it.next(); - if (!availableAggregations.contains(functionDesc)) { - // try to convert the metric to dimension to see if it works - TblColRef col = functionDesc.getParameter().getColRefs().get(0); - functionDesc.setDimensionAsMetric(true); - olapContext.rewriteFields.remove(functionDesc.getRewriteFieldName()); - if (col != null) { - olapContext.metricsColumns.remove(col); - olapContext.groupByColumns.add(col); - } - logger.info("Adjust OLAPContext for " + functionDesc); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/CubesSortRule.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/CubesSortRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/CubesSortRule.java deleted file mode 100644 index 1271344..0000000 --- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/CubesSortRule.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.query.routing.RoutingRules; - -import java.util.Comparator; -import java.util.List; - -import org.apache.kylin.common.util.PartialSorter; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.metadata.realization.IRealization; -import org.apache.kylin.metadata.realization.RealizationType; -import org.apache.kylin.query.relnode.OLAPContext; -import org.apache.kylin.query.routing.RoutingRule; - -/** - * Created by Hongbin Ma(Binmahone) on 1/5/15. - */ -public class CubesSortRule extends RoutingRule { - @Override - public void apply(List<IRealization> realizations, OLAPContext olapContext) { - - // sort cube candidates, 0) the cost indicator, 1) the lesser header - // columns the better, 2) the lesser body columns the better - List<Integer> items = super.findRealizationsOf(realizations, RealizationType.CUBE); - PartialSorter.partialSort(realizations, items, new Comparator<IRealization>() { - @Override - public int compare(IRealization o1, IRealization o2) { - CubeInstance c1 = (CubeInstance) o1; - CubeInstance c2 = (CubeInstance) o2; - int comp = 0; - comp = c1.getCost() - c2.getCost(); - if (comp != 0) { - return comp; - } - - CubeDesc schema1 = c1.getDescriptor(); - CubeDesc schema2 = c2.getDescriptor(); - - comp = schema1.listDimensionColumnsIncludingDerived().size() - schema2.listDimensionColumnsIncludingDerived().size(); - if (comp != 0) - return comp; - - comp = schema1.getMeasures().size() - schema2.getMeasures().size(); - return comp; - } - }); - - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java deleted file mode 100644 index 7513efd..0000000 --- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.query.routing.RoutingRules; - -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; - -import org.apache.kylin.metadata.realization.IRealization; -import org.apache.kylin.metadata.realization.RealizationType; -import org.apache.kylin.query.relnode.OLAPContext; -import org.apache.kylin.query.routing.RoutingRule; - -import com.google.common.collect.Maps; - -/** - * Created by Hongbin Ma(Binmahone) on 1/5/15. - */ -public class RealizationPriorityRule extends RoutingRule { - static Map<RealizationType, Integer> priorities = Maps.newHashMap(); - - static { - priorities.put(RealizationType.CUBE, 1); - priorities.put(RealizationType.HYBRID, 1); - priorities.put(RealizationType.INVERTED_INDEX, 2); - } - - public static void setPriorities(Map<RealizationType, Integer> priorities) { - RealizationPriorityRule.priorities = priorities; - } - - public void apply(List<IRealization> realizations, OLAPContext olapContext) { - - Collections.sort(realizations, new Comparator<IRealization>() { - @Override - public int compare(IRealization o1, IRealization o2) { - int i1 = priorities.get(o1.getType()); - int i2 = priorities.get(o2.getType()); - return i1 - i2; - } - }); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java deleted file mode 100644 index 8a1a228..0000000 --- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.query.routing.RoutingRules; - -import java.util.Collections; -import java.util.Comparator; -import java.util.List; - -import org.apache.kylin.metadata.realization.IRealization; -import org.apache.kylin.query.relnode.OLAPContext; -import org.apache.kylin.query.routing.RoutingRule; -import org.apache.kylin.storage.hybrid.HybridInstance; - -/** - */ -public class RealizationSortRule extends RoutingRule { - @Override - public void apply(List<IRealization> realizations, final OLAPContext olapContext) { - - // sort cube candidates, 0) the priority 1) the cost indicator, 2) the lesser header - // columns the better, 3) the lesser body columns the better 4) the larger date range the better - - Collections.sort(realizations, new Comparator<IRealization>() { - @Override - public int compare(IRealization o1, IRealization o2) { - int i1 = RealizationPriorityRule.priorities.get(o1.getType()); - int i2 = RealizationPriorityRule.priorities.get(o2.getType()); - int comp = i1 - i2; - if (comp != 0) { - return comp; - } - - comp = o1.getCost(olapContext.getSQLDigest()) - o2.getCost(olapContext.getSQLDigest()); - if (comp != 0) { - return comp; - } - - if (o1 instanceof HybridInstance) - return -1; - else if (o2 instanceof HybridInstance) - return 1; - - return 0; - } - }); - - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RemoveUncapableRealizationsRule.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RemoveUncapableRealizationsRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RemoveUncapableRealizationsRule.java deleted file mode 100644 index 40b0491..0000000 --- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RemoveUncapableRealizationsRule.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.query.routing.RoutingRules; - -import java.util.Iterator; -import java.util.List; - -import org.apache.kylin.metadata.realization.IRealization; -import org.apache.kylin.query.relnode.OLAPContext; -import org.apache.kylin.query.routing.RoutingRule; - -/** - * Created by Hongbin Ma(Binmahone) on 1/5/15. - */ -public class RemoveUncapableRealizationsRule extends RoutingRule { - @Override - public void apply(List<IRealization> realizations, OLAPContext olapContext) { - for (Iterator<IRealization> iterator = realizations.iterator(); iterator.hasNext();) { - IRealization realization = iterator.next(); - if (!realization.isCapable(olapContext.getSQLDigest())) { - iterator.remove(); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/SimpleQueryMoreColumnsCubeFirstRule.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/SimpleQueryMoreColumnsCubeFirstRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/SimpleQueryMoreColumnsCubeFirstRule.java deleted file mode 100644 index b3628b3..0000000 --- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/SimpleQueryMoreColumnsCubeFirstRule.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.query.routing.RoutingRules; - -import java.util.Comparator; -import java.util.List; - -import org.apache.kylin.common.util.PartialSorter; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.metadata.realization.IRealization; -import org.apache.kylin.metadata.realization.RealizationType; -import org.apache.kylin.query.relnode.OLAPContext; -import org.apache.kylin.query.routing.RoutingRule; - -/** - * Created by Hongbin Ma(Binmahone) on 1/5/15. - */ -public class SimpleQueryMoreColumnsCubeFirstRule extends RoutingRule { - @Override - public void apply(List<IRealization> realizations, OLAPContext olapContext) { - List<Integer> itemIndexes = super.findRealizationsOf(realizations, RealizationType.CUBE); - - if (olapContext.isSimpleQuery()) { - PartialSorter.partialSort(realizations, itemIndexes, new Comparator<IRealization>() { - @Override - public int compare(IRealization o1, IRealization o2) { - CubeInstance c1 = (CubeInstance) o1; - CubeInstance c2 = (CubeInstance) o2; - return c1.getDescriptor().listDimensionColumnsIncludingDerived().size() - c2.getDescriptor().listDimensionColumnsIncludingDerived().size(); - } - }); - } - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/rules/RealizationSortRule.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/routing/rules/RealizationSortRule.java b/query/src/main/java/org/apache/kylin/query/routing/rules/RealizationSortRule.java new file mode 100644 index 0000000..d3c67d7 --- /dev/null +++ b/query/src/main/java/org/apache/kylin/query/routing/rules/RealizationSortRule.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.query.routing.rules; + +import java.util.Collections; +import java.util.List; + +import org.apache.kylin.query.routing.Candidate; +import org.apache.kylin.query.routing.RoutingRule; + +/** + */ +public class RealizationSortRule extends RoutingRule { + @Override + public void apply(List<Candidate> candidates) { + Collections.sort(candidates); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveUncapableRealizationsRule.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveUncapableRealizationsRule.java b/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveUncapableRealizationsRule.java new file mode 100644 index 0000000..576b47f --- /dev/null +++ b/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveUncapableRealizationsRule.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.query.routing.rules; + +import java.util.Iterator; +import java.util.List; + +import org.apache.kylin.metadata.realization.CapabilityResult; +import org.apache.kylin.query.routing.Candidate; +import org.apache.kylin.query.routing.RoutingRule; + +/** + */ +public class RemoveUncapableRealizationsRule extends RoutingRule { + @Override + public void apply(List<Candidate> candidates) { + for (Iterator<Candidate> iterator = candidates.iterator(); iterator.hasNext();) { + Candidate candidate = iterator.next(); + + CapabilityResult capability = candidate.getRealization().isCapable(candidate.getSqlDigest()); + if (capability.capable) + candidate.setCapability(capability); + else + iterator.remove(); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java index bbf1024..8b1ad29 100644 --- a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java +++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java @@ -175,7 +175,7 @@ public class OLAPTable extends AbstractQueryableTable implements TranslatableTab metFields.add(fieldName); ColumnDesc fakeCountCol = new ColumnDesc(); fakeCountCol.setName(fieldName); - fakeCountCol.setDatatype(func.getSQLType().toString()); + fakeCountCol.setDatatype(func.getRewriteFieldType().toString()); if (func.isCount()) fakeCountCol.setNullable(false); fakeCountCol.init(sourceTable); @@ -188,11 +188,11 @@ public class OLAPTable extends AbstractQueryableTable implements TranslatableTab HashSet<ColumnDesc> updateColumns = Sets.newHashSet(); for (MeasureDesc m : mgr.listEffectiveMeasures(olapSchema.getProjectName(), sourceTable.getIdentity(), false)) { if (m.getFunction().isSum()) { - FunctionDesc functionDesc = m.getFunction(); - if (functionDesc.getReturnDataType() != functionDesc.getSQLType() && // - functionDesc.getReturnDataType().isBigInt() && // - functionDesc.getSQLType().isIntegerFamily()) { - updateColumns.add(functionDesc.getParameter().getColRefs().get(0).getColumnDesc()); + FunctionDesc func = m.getFunction(); + if (//func.getReturnDataType() != func.getRewriteFieldType() && // + func.getReturnDataType().isBigInt() && // + func.getRewriteFieldType().isIntegerFamily()) { + updateColumns.add(func.getParameter().getColRefs().get(0).getColumnDesc()); } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java b/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java deleted file mode 100644 index 9716ff6..0000000 --- a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.query.sqlfunc; - -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author xjiang - */ -public class HLLDistinctCountAggFunc { - - private static final Logger logger = LoggerFactory.getLogger(HLLDistinctCountAggFunc.class); - - public static HyperLogLogPlusCounter init() { - return null; - } - - public static HyperLogLogPlusCounter initAdd(Object v) { - if (v instanceof Long) { // holistic case - long l = (Long) v; - return new FixedValueHLLCMockup(l); - } else { - HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v; - return new HyperLogLogPlusCounter(c); - } - } - - public static HyperLogLogPlusCounter add(HyperLogLogPlusCounter counter, Object v) { - if (v instanceof Long) { // holistic case - long l = (Long) v; - if (counter == null) { - return new FixedValueHLLCMockup(l); - } else { - if (!(counter instanceof FixedValueHLLCMockup)) - throw new IllegalStateException("counter is not FixedValueHLLCMockup"); - - ((FixedValueHLLCMockup) counter).set(l); - return counter; - } - } else { - HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v; - if (counter == null) { - return new HyperLogLogPlusCounter(c); - } else { - counter.merge(c); - return counter; - } - } - } - - public static HyperLogLogPlusCounter merge(HyperLogLogPlusCounter counter0, Object counter1) { - return add(counter0, counter1); - } - - public static long result(HyperLogLogPlusCounter counter) { - return counter == null ? 0L : counter.getCountEstimate(); - } - - private static class FixedValueHLLCMockup extends HyperLogLogPlusCounter { - - private Long value = null; - - FixedValueHLLCMockup(long value) { - this.value = value; - } - - public void set(long value) { - if (this.value == null) { - this.value = value; - } else { - long oldValue = Math.abs(this.value.longValue()); - long take = Math.max(oldValue, value); - logger.warn("Error to aggregate holistic count distinct, old value " + oldValue + ", new value " + value + ", taking " + take); - this.value = -take; // make it obvious that this value is wrong - } - } - - @Override - public void clear() { - this.value = null; - } - - @Override - protected void add(long hash) { - throw new UnsupportedOperationException(); - } - - @Override - public void merge(HyperLogLogPlusCounter another) { - throw new UnsupportedOperationException(); - } - - @Override - public long getCountEstimate() { - return value; - } - - @Override - public void writeRegisters(ByteBuffer out) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void readRegisters(ByteBuffer in) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public int hashCode() { - final int prime = 31; - int result = super.hashCode(); - result = prime * result + (int) (value ^ (value >>> 32)); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (!super.equals(obj)) - return false; - if (getClass() != obj.getClass()) - return false; - FixedValueHLLCMockup other = (FixedValueHLLCMockup) obj; - if (!value.equals(other.value)) - return false; - return true; - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java ---------------------------------------------------------------------- diff --git a/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java index 9356663..e8c03ae 100644 --- a/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java +++ b/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java @@ -21,7 +21,7 @@ package org.apache.kylin.query.test; import java.util.Map; import org.apache.kylin.metadata.realization.RealizationType; -import org.apache.kylin.query.routing.RoutingRules.RealizationPriorityRule; +import org.apache.kylin.query.routing.Candidate; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -35,10 +35,10 @@ public class IIQueryTest extends KylinQueryTest { KylinQueryTest.setUp();//invoke super class Map<RealizationType, Integer> priorities = Maps.newHashMap(); - priorities.put(RealizationType.INVERTED_INDEX, 1); - priorities.put(RealizationType.CUBE, 2); - priorities.put(RealizationType.HYBRID, 2); - RealizationPriorityRule.setPriorities(priorities); + priorities.put(RealizationType.INVERTED_INDEX, 0); + priorities.put(RealizationType.CUBE, 1); + priorities.put(RealizationType.HYBRID, 1); + Candidate.setPriorities(priorities); } @@ -47,10 +47,10 @@ public class IIQueryTest extends KylinQueryTest { KylinQueryTest.tearDown();//invoke super class Map<RealizationType, Integer> priorities = Maps.newHashMap(); - priorities.put(RealizationType.INVERTED_INDEX, 2); - priorities.put(RealizationType.CUBE, 1); - priorities.put(RealizationType.HYBRID, 1); - RealizationPriorityRule.setPriorities(priorities); + priorities.put(RealizationType.INVERTED_INDEX, 1); + priorities.put(RealizationType.CUBE, 0); + priorities.put(RealizationType.HYBRID, 0); + Candidate.setPriorities(priorities); } @Test http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java ---------------------------------------------------------------------- diff --git a/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java index 84f1042..f9e575b 100644 --- a/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java +++ b/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java @@ -150,7 +150,7 @@ public class KylinQueryTest extends KylinTestBase { @Test public void testSingleExecuteQuery() throws Exception { - String queryFileName = "src/test/resources/query/sql/query39.sql"; + String queryFileName = "src/test/resources/query/sql/query00.sql"; File sqlFile = new File(queryFileName); String sql = getTextFromFile(sqlFile); http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java ---------------------------------------------------------------------- diff --git a/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java b/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java index 4ca9b47..95a11f8 100644 --- a/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java +++ b/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java @@ -209,7 +209,7 @@ public class KylinTestBase { if (needSort) { queryTable = new SortedTable(queryTable, columnNames); } - // printResult(queryTable); + printResult(queryTable); return queryTable; } @@ -265,7 +265,7 @@ public class KylinTestBase { if (needSort) { queryTable = new SortedTable(queryTable, columnNames); } - // printResult(queryTable); + printResult(queryTable); return queryTable; } http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/server/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/JobService.java b/server/src/main/java/org/apache/kylin/rest/service/JobService.java index 25f245b..0903f83 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.Set; import org.apache.commons.lang3.StringUtils; -import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.CubeBuildTypeEnum; @@ -143,13 +142,8 @@ public class JobService extends BasicService { builder.setSubmitter(submitter); if (buildType == CubeBuildTypeEnum.BUILD) { - if (cube.getDescriptor().hasHolisticCountDistinctMeasures() && cube.getSegments().size() > 0) { - Pair<CubeSegment, CubeSegment> segs = getCubeManager().appendAndMergeSegments(cube, endDate); - job = builder.buildAndMergeJob(segs.getFirst(), segs.getSecond()); - } else { - CubeSegment newSeg = getCubeManager().appendSegments(cube, endDate); - job = builder.buildJob(newSeg); - } + CubeSegment newSeg = getCubeManager().appendSegments(cube, endDate); + job = builder.buildJob(newSeg); } else if (buildType == CubeBuildTypeEnum.MERGE) { CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, forceMergeEmptySeg); job = builder.mergeJob(newSeg); http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java b/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java index 49f1c32..f529145 100644 --- a/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java +++ b/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java @@ -20,7 +20,7 @@ package org.apache.kylin.storage.filter; import java.util.List; -import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.metadata.filter.CompareTupleFilter; import org.apache.kylin.metadata.filter.LogicalTupleFilter; import org.apache.kylin.metadata.filter.TupleFilter; http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/ColumnValueRange.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/ColumnValueRange.java b/storage/src/main/java/org/apache/kylin/storage/hbase/ColumnValueRange.java index 9ae4a6d..7533800 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/ColumnValueRange.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/ColumnValueRange.java @@ -24,7 +24,7 @@ import java.util.Iterator; import java.util.Set; import org.apache.kylin.cube.kv.RowKeyColumnOrder; -import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; import org.apache.kylin.metadata.model.TblColRef; http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java index fa4ccd7..9627efb 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java @@ -22,11 +22,11 @@ import java.io.IOException; import java.text.MessageFormat; import java.util.BitSet; import java.util.Collection; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.NoSuchElementException; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTableInterface; @@ -42,6 +42,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.StorageException; import org.apache.kylin.common.util.Array; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -51,19 +52,21 @@ import org.apache.kylin.cube.kv.RowKeyDecoder; import org.apache.kylin.cube.kv.RowValueDecoder; import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; import org.apache.kylin.cube.model.HBaseColumnDesc; +import org.apache.kylin.measure.MeasureType; import org.apache.kylin.metadata.filter.TupleFilter; +import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.tuple.ITupleIterator; +import org.apache.kylin.metadata.tuple.Tuple; +import org.apache.kylin.metadata.tuple.TupleInfo; import org.apache.kylin.storage.StorageContext; import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverEnabler; -import org.apache.kylin.storage.tuple.Tuple; -import org.apache.kylin.storage.tuple.Tuple.IDerivedColumnFiller; -import org.apache.kylin.storage.tuple.TupleInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; /** * @author xjiang @@ -78,7 +81,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { private final Collection<TblColRef> dimensions; private final TupleFilter filter; private final Collection<TblColRef> groupBy; - private final Collection<RowValueDecoder> rowValueDecoders; + private final List<RowValueDecoder> rowValueDecoders; private final StorageContext context; private final String tableName; private final HTableInterface table; @@ -88,12 +91,21 @@ public class CubeSegmentTupleIterator implements ITupleIterator { private Scan scan; private ResultScanner scanner; private Iterator<Result> resultIterator; + private CubeTupleConverter cubeTupleConverter; private TupleInfo tupleInfo; - private Tuple tuple; + private Tuple oneTuple; + private Tuple next; private int scanCount; private int scanCountDelta; - public CubeSegmentTupleIterator(CubeSegment cubeSeg, Collection<HBaseKeyRange> keyRanges, HConnection conn, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) { + final List<MeasureType<?>> measureTypes; + final List<MeasureType.IAdvMeasureFiller> advMeasureFillers; + final List<Pair<Integer, Integer>> advMeasureIndexInRV;//first=> which rowValueDecoders,second => metric index + + private int advMeasureRowsRemaining; + private int advMeasureRowIndex; + + public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, StorageContext context) { this.cube = cubeSeg.getCubeInstance(); this.cubeSeg = cubeSeg; this.dimensions = dimensions; @@ -103,14 +115,22 @@ public class CubeSegmentTupleIterator implements ITupleIterator { this.context = context; this.tableName = cubeSeg.getStorageLocationIdentifier(); this.rowKeyDecoder = new RowKeyDecoder(this.cubeSeg); + + measureTypes = Lists.newArrayList(); + advMeasureFillers = Lists.newArrayListWithCapacity(1); + advMeasureIndexInRV = Lists.newArrayListWithCapacity(1); + + this.cubeTupleConverter = new CubeTupleConverter(); + this.tupleInfo = buildTupleInfo(keyRanges.get(0).getCuboid()); + this.oneTuple = new Tuple(this.tupleInfo); + this.rangeIterator = keyRanges.iterator(); + try { this.table = conn.getTable(tableName); } catch (Throwable t) { throw new StorageException("Error when open connection to table " + tableName, t); } - this.rangeIterator = keyRanges.iterator(); - scanNextRange(); } @Override @@ -121,7 +141,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { private void closeScanner() { flushScanCountDelta(); - + if (logger.isDebugEnabled() && scan != null) { logger.debug("Scan " + scan.toString()); byte[] metricsBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA); @@ -150,59 +170,90 @@ public class CubeSegmentTupleIterator implements ITupleIterator { } } + private void flushScanCountDelta() { + context.increaseTotalScanCount(scanCountDelta); + scanCountDelta = 0; + } + @Override - public boolean hasNext() { - return rangeIterator.hasNext() || resultIterator.hasNext(); + public void remove() { + throw new UnsupportedOperationException(); } @Override - public Tuple next() { - // get next result from hbase - Result result = null; - while (hasNext()) { - if (resultIterator.hasNext()) { - result = this.resultIterator.next(); - scanCount++; - if (++scanCountDelta >= 1000) - flushScanCountDelta(); - break; - } else { - scanNextRange(); + public boolean hasNext() { + + if (next != null) + return true; + + // consume any left rows from advanced measure filler + if (advMeasureRowsRemaining > 0) { + for (MeasureType.IAdvMeasureFiller filler : advMeasureFillers) { + filler.fillTuplle(oneTuple, advMeasureRowIndex); } + advMeasureRowIndex++; + advMeasureRowsRemaining--; + next = oneTuple; + return true; } - if (result == null) { - return null; + + if (resultIterator == null) { + if (rangeIterator.hasNext() == false) + return false; + + resultIterator = doScan(rangeIterator.next()); + } + + if (resultIterator.hasNext() == false) { + closeScanner(); + resultIterator = null; + return hasNext(); } - // translate result to tuple + + Result result = resultIterator.next(); + scanCount++; + if (++scanCountDelta >= 1000) + flushScanCountDelta(); + + // translate into tuple + List<MeasureType.IAdvMeasureFiller> retFillers = null; try { - translateResult(result, this.tuple); + retFillers = translateResult(result, oneTuple); } catch (IOException e) { - throw new IllegalStateException("Can't translate result " + result, e); + throw new RuntimeException(e); } - return this.tuple; - } - private void flushScanCountDelta() { - context.increaseTotalScanCount(scanCountDelta); - scanCountDelta = 0; - } + // the simple case + if (retFillers == null) { + next = oneTuple; + return true; + } - @Override - public void remove() { - throw new UnsupportedOperationException(); - } + // advanced measure filling, like TopN, will produce multiple tuples out of one record + advMeasureRowsRemaining = -1; + for (MeasureType.IAdvMeasureFiller filler : retFillers) { + if (advMeasureRowsRemaining < 0) + advMeasureRowsRemaining = filler.getNumOfRows(); + if (advMeasureRowsRemaining != filler.getNumOfRows()) + throw new IllegalStateException(); + } + if (advMeasureRowsRemaining < 0) + throw new IllegalStateException(); - private void scanNextRange() { - if (this.rangeIterator.hasNext()) { - closeScanner(); - HBaseKeyRange keyRange = this.rangeIterator.next(); - this.tupleInfo = buildTupleInfo(keyRange.getCuboid()); - this.tuple = new Tuple(this.tupleInfo); + advMeasureRowIndex = 0; + return hasNext(); + } - this.resultIterator = doScan(keyRange); - } else { - this.resultIterator = Collections.<Result> emptyList().iterator(); + @Override + public Tuple next() { + if (next == null) { + hasNext(); + if (next == null) + throw new NoSuchElementException(); } + Tuple r = next; + next = null; + return r; } private final Iterator<Result> doScan(HBaseKeyRange keyRange) { @@ -272,7 +323,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { scan.setCacheBlocks(true); // cache less when there are memory hungry measures - if (RowValueDecoder.hasMemHungryCountDistinct(rowValueDecoders)) { + if (RowValueDecoder.hasMemHungryMeasures(rowValueDecoders)) { scan.setCaching(scan.getCaching() / 10); } } @@ -306,6 +357,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { } private TupleInfo buildTupleInfo(Cuboid cuboid) { + TupleInfo info = new TupleInfo(); int index = 0; rowKeyDecoder.setCuboid(cuboid); @@ -331,21 +383,48 @@ public class CubeSegmentTupleIterator implements ITupleIterator { info.setField(derivedField, derivedCol, derivedCol.getType().getName(), index++); } // add filler - info.addDerivedColumnFiller(Tuple.newDerivedColumnFiller(rowColumns, hostCols, deriveInfo, info, CubeManager.getInstance(this.cube.getConfig()), cubeSeg)); + cubeTupleConverter.addDerivedColumnFiller(CubeTupleConverter.newDerivedColumnFiller(rowColumns, hostCols, deriveInfo, info, CubeManager.getInstance(this.cube.getConfig()), cubeSeg)); } } - for (RowValueDecoder rowValueDecoder : this.rowValueDecoders) { - List<String> names = rowValueDecoder.getNames(); + for (int i = 0; i < rowValueDecoders.size(); i++) { + RowValueDecoder rowValueDecoder = rowValueDecoders.get(i); + List<String> measureNames = rowValueDecoder.getNames(); MeasureDesc[] measures = rowValueDecoder.getMeasures(); - for (int i = 0; i < measures.length; i++) { - String dataType = measures[i].getFunction().getSQLType().getName(); - info.setField(names.get(i), null, dataType, index++); + + BitSet projectionIndex = rowValueDecoder.getProjectionIndex(); + for (int mi = projectionIndex.nextSetBit(0); mi >= 0; mi = projectionIndex.nextSetBit(mi + 1)) { + FunctionDesc aggrFunc = measures[mi].getFunction(); + String dataType = measures[mi].getFunction().getRewriteFieldType().getName(); + info.setField(measureNames.get(mi), null, dataType, index++); + + MeasureType<?> measureType = aggrFunc.getMeasureType(); + if (measureType.needAdvancedTupleFilling()) { + Map<TblColRef, Dictionary<String>> dictionaryMap = buildDictionaryMap(measureType.getColumnsNeedDictionary(aggrFunc)); + advMeasureFillers.add(measureType.getAdvancedTupleFiller(aggrFunc, tupleInfo, dictionaryMap)); + advMeasureIndexInRV.add(Pair.newPair(i, mi)); + measureTypes.add(null); + } else { + measureTypes.add(measureType); + } } + // for (int i = 0; i < measures.length; i++) { + // String dataType = measures[i].getFunction().getRewriteFieldType().getName(); + // info.setField(measureNames.get(i), null, dataType, index++); + // } } return info; } + // load only needed dictionaries + private Map<TblColRef, Dictionary<String>> buildDictionaryMap(List<TblColRef> columnsNeedDictionary) { + Map<TblColRef, Dictionary<String>> result = Maps.newHashMap(); + for (TblColRef col : columnsNeedDictionary) { + result.put(col, cubeSeg.getDictionary(col)); + } + return result; + } + private String getFieldName(TblColRef column, Map<TblColRef, String> aliasMap) { String name = null; if (aliasMap != null) { @@ -357,7 +436,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { return name; } - private void translateResult(Result res, Tuple tuple) throws IOException { + private List<MeasureType.IAdvMeasureFiller> translateResult(Result res, Tuple tuple) throws IOException { // groups byte[] rowkey = res.getRow(); rowKeyDecoder.decode(rowkey); @@ -373,11 +452,12 @@ public class CubeSegmentTupleIterator implements ITupleIterator { } // derived - for (IDerivedColumnFiller filler : tupleInfo.getDerivedColumnFillers()) { + for (CubeTupleConverter.IDerivedColumnFiller filler : cubeTupleConverter.getDerivedColumnFillers()) { filler.fillDerivedColumns(dimensionValues, tuple); } // aggregations + int measureIndex = 0; for (RowValueDecoder rowValueDecoder : this.rowValueDecoders) { HBaseColumnDesc hbaseColumn = rowValueDecoder.getHBaseColumn(); String columnFamily = hbaseColumn.getColumnFamilyName(); @@ -389,8 +469,23 @@ public class CubeSegmentTupleIterator implements ITupleIterator { Object[] measureValues = rowValueDecoder.getValues(); BitSet projectionIndex = rowValueDecoder.getProjectionIndex(); for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) { - tuple.setMeasureValue(measureNames.get(i), measureValues[i]); + if (measureTypes.get(measureIndex) != null) { + measureTypes.get(measureIndex).fillTupleSimply(tuple, tupleInfo.getFieldIndex(measureNames.get(i)), measureValues[i]); + } + } + measureIndex++; + } + + // advanced measure filling, due to possible row split, will complete at caller side + if (advMeasureFillers.isEmpty()) { + return null; + } else { + for (int i = 0; i < advMeasureFillers.size(); i++) { + Pair<Integer, Integer> metricLocation = advMeasureIndexInRV.get(i); + Object measureValue = rowValueDecoders.get(metricLocation.getFirst()).getValues()[metricLocation.getSecond()]; + advMeasureFillers.get(i).reload(measureValue); } + return advMeasureFillers; } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java index bbf088e..e612eb1 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java @@ -35,6 +35,7 @@ import java.util.TreeSet; import org.apache.hadoop.hbase.client.HConnection; import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -45,8 +46,8 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; import org.apache.kylin.cube.model.HBaseColumnDesc; import org.apache.kylin.cube.model.HBaseMappingDesc; -import org.apache.kylin.dict.Dictionary; import org.apache.kylin.dict.lookup.LookupStringTable; +import org.apache.kylin.measure.MeasureType; import org.apache.kylin.metadata.filter.ColumnTupleFilter; import org.apache.kylin.metadata.filter.CompareTupleFilter; import org.apache.kylin.metadata.filter.LogicalTupleFilter; @@ -87,6 +88,8 @@ public class CubeStorageEngine implements IStorageEngine { @Override public ITupleIterator search(StorageContext context, SQLDigest sqlDigest) { + // allow custom measures hack + notifyBeforeStorageQuery(sqlDigest); Collection<TblColRef> groups = sqlDigest.groupbyColumns; TupleFilter filter = sqlDigest.filter; @@ -354,12 +357,6 @@ public class CubeStorageEngine implements IStorageEngine { for (HBaseColumnDesc hbCol : hbCols) { bestHBCol = hbCol; bestIndex = hbCol.findMeasureIndex(aggrFunc); - MeasureDesc measure = hbCol.getMeasures()[bestIndex]; - // criteria for holistic measure: Exact Aggregation && Exact Cuboid - if (measure.isHolisticCountDistinct() && context.isExactAggregation()) { - logger.info("Holistic count distinct chosen for " + aggrFunc); - break; - } } RowValueDecoder codec = codecMap.get(bestHBCol); @@ -628,7 +625,7 @@ public class CubeStorageEngine implements IStorageEngine { } private void setThreshold(Collection<TblColRef> dimensions, List<RowValueDecoder> valueDecoders, StorageContext context) { - if (RowValueDecoder.hasMemHungryCountDistinct(valueDecoders) == false) { + if (RowValueDecoder.hasMemHungryMeasures(valueDecoders) == false) { return; } @@ -638,7 +635,7 @@ public class CubeStorageEngine implements IStorageEngine { BitSet projectionIndex = decoder.getProjectionIndex(); for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) { FunctionDesc func = measures[i].getFunction(); - rowSizeEst += func.getReturnDataType().getSpaceEstimate(); + rowSizeEst += func.getReturnDataType().getStorageBytesEstimate(); } } @@ -665,4 +662,13 @@ public class CubeStorageEngine implements IStorageEngine { ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context); } + + private void notifyBeforeStorageQuery(SQLDigest sqlDigest) { + for (MeasureDesc measure : cubeDesc.getMeasures()) { + MeasureType<?> measureType = measure.getFunction().getMeasureType(); + measureType.adjustSqlDigest(measure, sqlDigest); + } + } + + } http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/CubeTupleConverter.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeTupleConverter.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeTupleConverter.java new file mode 100644 index 0000000..11987a4 --- /dev/null +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeTupleConverter.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.hbase; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.kylin.common.util.Array; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.dict.lookup.LookupStringTable; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.tuple.Tuple; +import org.apache.kylin.metadata.tuple.TupleInfo; + +public class CubeTupleConverter { + + private final List<IDerivedColumnFiller> derivedColumnFillers; + + public CubeTupleConverter() { + derivedColumnFillers = new ArrayList<IDerivedColumnFiller>(); + } + + public void addDerivedColumnFiller(IDerivedColumnFiller derivedColumnFiller) { + derivedColumnFillers.add(derivedColumnFiller); + } + + public List<IDerivedColumnFiller> getDerivedColumnFillers() { + return derivedColumnFillers; + } + + // ============================================================================ + + public static IDerivedColumnFiller newDerivedColumnFiller(List<TblColRef> rowColumns, TblColRef[] hostCols, CubeDesc.DeriveInfo deriveInfo, TupleInfo tupleInfo, CubeManager cubeMgr, CubeSegment cubeSegment) { + + int[] hostIndex = new int[hostCols.length]; + for (int i = 0; i < hostCols.length; i++) { + hostIndex[i] = rowColumns.indexOf(hostCols[i]); + } + String[] derivedFieldNames = new String[deriveInfo.columns.length]; + for (int i = 0; i < deriveInfo.columns.length; i++) { + derivedFieldNames[i] = tupleInfo.getFieldName(deriveInfo.columns[i]); + } + + switch (deriveInfo.type) { + case LOOKUP: + LookupStringTable lookupTable = cubeMgr.getLookupTable(cubeSegment, deriveInfo.dimension); + return new LookupFiller(hostIndex, lookupTable, deriveInfo, derivedFieldNames); + case PK_FK: + // composite key are split, see CubeDesc.initDimensionColumns() + return new PKFKFiller(hostIndex[0], derivedFieldNames[0]); + default: + throw new IllegalArgumentException(); + } + } + + public interface IDerivedColumnFiller { + public void fillDerivedColumns(List<String> rowValues, Tuple tuple); + } + + static class PKFKFiller implements IDerivedColumnFiller { + final int hostIndex; + final String derivedFieldName; + + public PKFKFiller(int hostIndex, String derivedFieldName) { + this.hostIndex = hostIndex; + this.derivedFieldName = derivedFieldName; + } + + @Override + public void fillDerivedColumns(List<String> rowValues, Tuple tuple) { + String value = rowValues.get(hostIndex); + tuple.setDimensionValue(derivedFieldName, value); + } + } + + static class LookupFiller implements IDerivedColumnFiller { + + final int[] hostIndex; + final int hostLen; + final Array<String> lookupKey; + final LookupStringTable lookupTable; + final int[] derivedIndex; + final int derivedLen; + final String[] derivedFieldNames; + + public LookupFiller(int[] hostIndex, LookupStringTable lookupTable, CubeDesc.DeriveInfo deriveInfo, String[] derivedFieldNames) { + this.hostIndex = hostIndex; + this.hostLen = hostIndex.length; + this.lookupKey = new Array<String>(new String[hostLen]); + this.lookupTable = lookupTable; + this.derivedIndex = new int[deriveInfo.columns.length]; + this.derivedLen = derivedIndex.length; + this.derivedFieldNames = derivedFieldNames; + + for (int i = 0; i < derivedLen; i++) { + derivedIndex[i] = deriveInfo.columns[i].getColumn().getZeroBasedIndex(); + } + } + + @Override + public void fillDerivedColumns(List<String> rowValues, Tuple tuple) { + for (int i = 0; i < hostLen; i++) { + lookupKey.data[i] = rowValues.get(hostIndex[i]); + } + + String[] lookupRow = lookupTable.getRow(lookupKey); + + if (lookupRow != null) { + for (int i = 0; i < derivedLen; i++) { + String value = lookupRow[derivedIndex[i]]; + tuple.setDimensionValue(derivedFieldNames[i], value); + } + } else { + for (int i = 0; i < derivedLen; i++) { + tuple.setDimensionValue(derivedFieldNames[i], null); + } + } + } + + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java index a115753..d188a44 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java @@ -52,7 +52,7 @@ public class SerializedHBaseTupleIterator implements ITupleIterator { private ITupleIterator segmentIterator; private int scanCount; - public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) { + public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, StorageContext context) { this.context = context; int limit = context.getLimit(); http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java index 47a011a..26f95b7 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java @@ -18,11 +18,12 @@ package org.apache.kylin.storage.hbase.coprocessor; -import java.util.SortedMap; -import org.apache.kylin.metadata.measure.MeasureAggregator; import com.google.common.collect.Maps; +import org.apache.kylin.measure.MeasureAggregator; + +import java.util.SortedMap; /** * Created by Hongbin Ma(Binmahone) on 11/27/14. @@ -60,7 +61,7 @@ public abstract class AggregationCache { rowMemBytes = 0; MeasureAggregator[] measureAggregators = aggBufMap.get(aggBufMap.firstKey()); for (MeasureAggregator agg : measureAggregators) { - rowMemBytes += agg.getMemBytes(); + rowMemBytes += agg.getMemBytesEstimate(); } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorFilter.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorFilter.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorFilter.java index b3e2d31..2f6bf67 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorFilter.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorFilter.java @@ -24,7 +24,7 @@ import java.util.Set; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.cube.kv.RowKeyColumnIO; -import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.dict.ISegment; import org.apache.kylin.metadata.filter.*; import org.apache.kylin.metadata.filter.TupleFilterSerializer.Decorator; http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorTupleFilterTranslator.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorTupleFilterTranslator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorTupleFilterTranslator.java index 6affc18..bd2b912 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorTupleFilterTranslator.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorTupleFilterTranslator.java @@ -19,7 +19,7 @@ package org.apache.kylin.storage.hbase.coprocessor; import org.apache.kylin.cube.kv.RowKeyColumnIO; -import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.metadata.filter.ColumnTupleFilter; import org.apache.kylin.metadata.filter.CompareTupleFilter; import org.apache.kylin.metadata.filter.ConstantTupleFilter; http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java index 83e84a9..c33d192 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java @@ -21,7 +21,7 @@ package org.apache.kylin.storage.hbase.coprocessor.endpoint; import java.util.Map; import java.util.Set; -import org.apache.kylin.metadata.measure.MeasureAggregator; +import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.storage.hbase.coprocessor.AggrKey; import org.apache.kylin.storage.hbase.coprocessor.AggregationCache; http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java index 028da1e..3b4a9b2 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java @@ -29,9 +29,10 @@ import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.invertedindex.index.RawTableRecord; import org.apache.kylin.invertedindex.index.TableRecordInfo; import org.apache.kylin.invertedindex.index.TableRecordInfoDigest; -import org.apache.kylin.metadata.measure.MeasureAggregator; +import org.apache.kylin.measure.MeasureAggregator; +import org.apache.kylin.measure.hllc.HLLCMeasureType; +import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec; -import org.apache.kylin.metadata.model.DataType; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.storage.hbase.coprocessor.CoprocessorConstants; @@ -91,7 +92,7 @@ public class EndpointAggregators { throw new IllegalStateException("Column " + functionDesc.getParameter().getValue() + " is not found in II"); } - if (functionDesc.isCountDistinct()) { + if (HLLCMeasureType.isCountDistinct(functionDesc)) { metricInfos[i] = new MetricInfo(MetricType.DistinctCount, index, functionDesc.getReturnDataType().getPrecision()); } else { metricInfos[i] = new MetricInfo(MetricType.Normal, index); @@ -143,10 +144,10 @@ public class EndpointAggregators { MeasureAggregator[] aggrs = new MeasureAggregator[funcNames.length]; for (int i = 0; i < aggrs.length; i++) { if (metricInfos[i].type == MetricType.DistinctCount) { - aggrs[i] = MeasureAggregator.create(funcNames[i], dataTypes[i]); + aggrs[i] = MeasureAggregator.create(funcNames[i], DataType.getInstance(dataTypes[i])); } else { //all other fixed length measures can be aggregated as long - aggrs[i] = MeasureAggregator.create(funcNames[i], "long"); + aggrs[i] = MeasureAggregator.create(funcNames[i], DataType.getInstance("long")); } } return aggrs;