Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java?rev=1170007&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java Tue Sep 13 02:20:52 2011 @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.index; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.ParseDriver; +import org.apache.hadoop.hive.ql.parse.ParseException; +import org.apache.hadoop.hive.ql.parse.ParseUtils; +import org.apache.hadoop.hive.ql.parse.QB; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +/** + * RewriteParseContextGenerator is a class that offers methods to generate operator tree + * for input queries. It is implemented on lines of the analyzeInternal(..) method + * of {@link SemanticAnalyzer} but it creates only the ParseContext for the input query command. + * It does not optimize or generate map-reduce tasks for the input query. + * This can be used when you need to create operator tree for an internal query. + * + */ +public final class RewriteParseContextGenerator { + private static final Log LOG = LogFactory.getLog(RewriteParseContextGenerator.class.getName()); + + private RewriteParseContextGenerator(){ + } + + /** + * Parse the input {@link String} command and generate a ASTNode tree. + * @param conf + * @param command + * @return + * @throws SemanticException + */ + public static ParseContext generateOperatorTree(HiveConf conf, + String command) throws SemanticException{ + Context ctx; + ParseContext subPCtx = null; + try { + ctx = new Context(conf); + ParseDriver pd = new ParseDriver(); + ASTNode tree = pd.parse(command, ctx); + tree = ParseUtils.findRootNonNullToken(tree); + + BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree); + assert(sem instanceof SemanticAnalyzer); + doSemanticAnalysis((SemanticAnalyzer) sem, tree, ctx); + + subPCtx = ((SemanticAnalyzer) sem).getParseContext(); + LOG.info("Sub-query Semantic Analysis Completed"); + } catch (IOException e) { + LOG.error("IOException in generating the operator " + + "tree for input command - " + command + " " , e); + LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); + throw new SemanticException(e.getMessage(), e); + } catch (ParseException e) { + LOG.error("ParseException in generating the operator " + + "tree for input command - " + command + " " , e); + LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); + throw new SemanticException(e.getMessage(), e); + } catch (SemanticException e) { + LOG.error("SemanticException in generating the operator " + + "tree for input command - " + command + " " , e); + LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); + throw new SemanticException(e.getMessage(), e); + } + return subPCtx; + + } + + /** + * For the input ASTNode tree, perform a semantic analysis and check metadata + * Generate a operator tree and return the {@link ParseContext} instance for the operator tree. + * + * @param ctx + * @param sem + * @param ast + * @return + * @throws SemanticException + */ + private static void doSemanticAnalysis(SemanticAnalyzer sem, + ASTNode ast, Context ctx) throws SemanticException { + QB qb = new QB(null, null, false); + ASTNode child = ast; + ParseContext subPCtx = ((SemanticAnalyzer) sem).getParseContext(); + subPCtx.setContext(ctx); + ((SemanticAnalyzer) sem).init(subPCtx); + + LOG.info("Starting Sub-query Semantic Analysis"); + sem.doPhase1(child, qb, sem.initPhase1Ctx()); + LOG.info("Completed phase 1 of Sub-query Semantic Analysis"); + + sem.getMetaData(qb); + LOG.info("Completed getting MetaData in Sub-query Semantic Analysis"); + + LOG.info("Sub-query Abstract syntax tree: " + ast.toStringTree()); + sem.genPlan(qb); + + LOG.info("Sub-query Completed plan generation"); + } + +}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java?rev=1170007&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java Tue Sep 13 02:20:52 2011 @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.index; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.OpParseContext; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.RowResolver; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.AggregationDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; + +/** + * This class defines a procedure factory used to rewrite the operator plan + * Each method replaces the necessary base table data structures with + * the index table data structures for each operator. + */ +public final class RewriteQueryUsingAggregateIndex { + private static final Log LOG = LogFactory.getLog(RewriteQueryUsingAggregateIndex.class.getName()); + private static RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = null; + + private RewriteQueryUsingAggregateIndex() { + //this prevents the class from getting instantiated + } + + private static class NewQuerySelectSchemaProc implements NodeProcessor { + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, + Object... nodeOutputs) throws SemanticException { + SelectOperator operator = (SelectOperator)nd; + rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx; + List<Operator<? extends Serializable>> childOps = operator.getChildOperators(); + Operator<? extends Serializable> childOp = childOps.iterator().next(); + + //we need to set the colList, outputColumnNames, colExprMap, + // rowSchema for only that SelectOperator which precedes the GroupByOperator + // count(indexed_key_column) needs to be replaced by sum(`_count_Of_indexed_key_column`) + if (childOp instanceof GroupByOperator){ + List<ExprNodeDesc> selColList = + operator.getConf().getColList(); + selColList.add(rewriteQueryCtx.getAggrExprNode()); + + List<String> selOutputColNames = + operator.getConf().getOutputColumnNames(); + selOutputColNames.add(rewriteQueryCtx.getAggrExprNode().getColumn()); + + RowSchema selRS = operator.getSchema(); + List<ColumnInfo> selRSSignature = + selRS.getSignature(); + //Need to create a new type for Column[_count_Of_indexed_key_column] node + PrimitiveTypeInfo pti = (PrimitiveTypeInfo) TypeInfoFactory.getPrimitiveTypeInfo("bigint"); + pti.setTypeName("bigint"); + ColumnInfo newCI = new ColumnInfo(rewriteQueryCtx.getAggregateFunction(), pti, "", false); + selRSSignature.add(newCI); + selRS.setSignature((ArrayList<ColumnInfo>) selRSSignature); + operator.setSchema(selRS); + } + return null; + } + } + + public static NewQuerySelectSchemaProc getNewQuerySelectSchemaProc(){ + return new NewQuerySelectSchemaProc(); + } + + + /** + * This processor replaces the original TableScanOperator with + * the new TableScanOperator and metadata that scans over the + * index table rather than scanning over the orginal table. + * + */ + private static class ReplaceTableScanOpProc implements NodeProcessor { + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, + Object... nodeOutputs) throws SemanticException { + TableScanOperator scanOperator = (TableScanOperator)nd; + rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx; + String baseTableName = rewriteQueryCtx.getBaseTableName(); + String alias = null; + if(baseTableName.contains(":")){ + alias = (baseTableName.split(":"))[0]; + } + + //Need to remove the original TableScanOperators from these data structures + // and add new ones + Map<TableScanOperator, Table> topToTable = + rewriteQueryCtx.getParseContext().getTopToTable(); + Map<String, Operator<? extends Serializable>> topOps = + rewriteQueryCtx.getParseContext().getTopOps(); + Map<Operator<? extends Serializable>, OpParseContext> opParseContext = + rewriteQueryCtx.getParseContext().getOpParseCtx(); + + //need this to set rowResolver for new scanOperator + OpParseContext operatorContext = opParseContext.get(scanOperator); + + //remove original TableScanOperator + topToTable.remove(scanOperator); + topOps.remove(baseTableName); + opParseContext.remove(scanOperator); + + //construct a new descriptor for the index table scan + TableScanDesc indexTableScanDesc = new TableScanDesc(); + indexTableScanDesc.setGatherStats(false); + + String indexTableName = rewriteQueryCtx.getIndexName(); + Table indexTableHandle = null; + try { + indexTableHandle = rewriteQueryCtx.getHiveDb().getTable(indexTableName); + } catch (HiveException e) { + LOG.error("Error while getting the table handle for index table."); + LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); + throw new SemanticException(e.getMessage(), e); + } + + String k = indexTableName + Path.SEPARATOR; + indexTableScanDesc.setStatsAggPrefix(k); + scanOperator.setConf(indexTableScanDesc); + + //Construct the new RowResolver for the new TableScanOperator + RowResolver rr = new RowResolver(); + try { + StructObjectInspector rowObjectInspector = + (StructObjectInspector) indexTableHandle.getDeserializer().getObjectInspector(); + List<? extends StructField> fields = rowObjectInspector + .getAllStructFieldRefs(); + for (int i = 0; i < fields.size(); i++) { + rr.put(indexTableName, fields.get(i).getFieldName(), new ColumnInfo(fields + .get(i).getFieldName(), TypeInfoUtils + .getTypeInfoFromObjectInspector(fields.get(i) + .getFieldObjectInspector()), indexTableName, false)); + } + } catch (SerDeException e) { + LOG.error("Error while creating the RowResolver for new TableScanOperator."); + LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); + throw new SemanticException(e.getMessage(), e); + } + + //Set row resolver for new table + operatorContext.setRowResolver(rr); + String tabNameWithAlias = null; + if(alias != null){ + tabNameWithAlias = alias + ":" + indexTableName; + }else{ + tabNameWithAlias = indexTableName; + } + + //Scan operator now points to other table + topToTable.put(scanOperator, indexTableHandle); + scanOperator.getConf().setAlias(tabNameWithAlias); + scanOperator.setAlias(indexTableName); + topOps.put(tabNameWithAlias, scanOperator); + opParseContext.put(scanOperator, operatorContext); + rewriteQueryCtx.getParseContext().setTopToTable( + (HashMap<TableScanOperator, Table>) topToTable); + rewriteQueryCtx.getParseContext().setTopOps( + (HashMap<String, Operator<? extends Serializable>>) topOps); + rewriteQueryCtx.getParseContext().setOpParseCtx( + (LinkedHashMap<Operator<? extends Serializable>, OpParseContext>) opParseContext); + + return null; + } + } + + public static ReplaceTableScanOpProc getReplaceTableScanProc(){ + return new ReplaceTableScanOpProc(); + } + + /** + * We need to replace the count(indexed_column_key) GenericUDAF aggregation function for + * group-by construct to "sum" GenericUDAF. + * This processor creates a new operator tree for a sample query that creates a GroupByOperator + * with sum aggregation function and uses that GroupByOperator information to replace + * the original GroupByOperator aggregation information. + * It replaces the AggregationDesc (aggregation descriptor) of the old GroupByOperator with the + * new Aggregation Desc of the new GroupByOperator. + */ + private static class NewQueryGroupbySchemaProc implements NodeProcessor { + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, + Object... nodeOutputs) throws SemanticException { + GroupByOperator operator = (GroupByOperator)nd; + rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx; + + //We need to replace the GroupByOperator which is in + //groupOpToInputTables map with the new GroupByOperator + if(rewriteQueryCtx.getParseContext().getGroupOpToInputTables().containsKey(operator)){ + List<ExprNodeDesc> gbyKeyList = operator.getConf().getKeys(); + String gbyKeys = null; + Iterator<ExprNodeDesc> gbyKeyListItr = gbyKeyList.iterator(); + while(gbyKeyListItr.hasNext()){ + ExprNodeDesc expr = gbyKeyListItr.next().clone(); + if(expr instanceof ExprNodeColumnDesc){ + ExprNodeColumnDesc colExpr = (ExprNodeColumnDesc)expr; + gbyKeys = colExpr.getColumn(); + if(gbyKeyListItr.hasNext()){ + gbyKeys = gbyKeys + ","; + } + } + } + + + //the query contains the sum aggregation GenericUDAF + String selReplacementCommand = "select sum(`" + + rewriteQueryCtx.getAggregateFunction() + "`)" + + " from " + rewriteQueryCtx.getIndexName() + + " group by " + gbyKeys + " "; + //create a new ParseContext for the query to retrieve its operator tree, + //and the required GroupByOperator from it + ParseContext newDAGContext = RewriteParseContextGenerator.generateOperatorTree( + rewriteQueryCtx.getParseContext().getConf(), + selReplacementCommand); + + //we get our new GroupByOperator here + Map<GroupByOperator, Set<String>> newGbyOpMap = newDAGContext.getGroupOpToInputTables(); + GroupByOperator newGbyOperator = newGbyOpMap.keySet().iterator().next(); + GroupByDesc oldConf = operator.getConf(); + + //we need this information to set the correct colList, outputColumnNames in SelectOperator + ExprNodeColumnDesc aggrExprNode = null; + + //Construct the new AggregationDesc to get rid of the current + //internal names and replace them with new internal names + //as required by the operator tree + GroupByDesc newConf = newGbyOperator.getConf(); + List<AggregationDesc> newAggrList = newConf.getAggregators(); + if(newAggrList != null && newAggrList.size() > 0){ + for (AggregationDesc aggregationDesc : newAggrList) { + rewriteQueryCtx.setEval(aggregationDesc.getGenericUDAFEvaluator()); + aggrExprNode = (ExprNodeColumnDesc)aggregationDesc.getParameters().get(0); + rewriteQueryCtx.setAggrExprNode(aggrExprNode); + } + } + + //Now the GroupByOperator has the new AggregationList; sum(`_count_of_indexed_key`) + //instead of count(indexed_key) + OpParseContext gbyOPC = rewriteQueryCtx.getOpc().get(operator); + RowResolver gbyRR = newDAGContext.getOpParseCtx().get(newGbyOperator).getRowResolver(); + gbyOPC.setRowResolver(gbyRR); + rewriteQueryCtx.getOpc().put(operator, gbyOPC); + + oldConf.setAggregators((ArrayList<AggregationDesc>) newAggrList); + operator.setConf(oldConf); + + + }else{ + //we just need to reset the GenericUDAFEvaluator and its name for this + //GroupByOperator whose parent is the ReduceSinkOperator + GroupByDesc childConf = (GroupByDesc) operator.getConf(); + List<AggregationDesc> childAggrList = childConf.getAggregators(); + if(childAggrList != null && childAggrList.size() > 0){ + for (AggregationDesc aggregationDesc : childAggrList) { + List<ExprNodeDesc> paraList = aggregationDesc.getParameters(); + List<TypeInfo> parametersTypeInfoList = new ArrayList<TypeInfo>(); + for (ExprNodeDesc expr : paraList) { + parametersTypeInfoList.add(expr.getTypeInfo()); + } + GenericUDAFEvaluator evaluator = FunctionRegistry.getGenericUDAFEvaluator( + "sum", parametersTypeInfoList, false, false); + aggregationDesc.setGenericUDAFEvaluator(evaluator); + aggregationDesc.setGenericUDAFName("sum"); + } + } + + } + + return null; + } + } + + public static NewQueryGroupbySchemaProc getNewQueryGroupbySchemaProc(){ + return new NewQueryGroupbySchemaProc(); + } +} Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java?rev=1170007&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java Tue Sep 13 02:20:52 2011 @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.index; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.parse.OpParseContext; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; + +/** + * RewriteQueryUsingAggregateIndexCtx class stores the + * context for the {@link RewriteQueryUsingAggregateIndex} + * used to rewrite operator plan with index table instead of base table. + */ + +public final class RewriteQueryUsingAggregateIndexCtx implements NodeProcessorCtx { + + private RewriteQueryUsingAggregateIndexCtx(ParseContext parseContext, Hive hiveDb, + String indexTableName, String baseTableName, String aggregateFunction){ + this.parseContext = parseContext; + this.hiveDb = hiveDb; + this.indexTableName = indexTableName; + this.baseTableName = baseTableName; + this.aggregateFunction = aggregateFunction; + this.opc = parseContext.getOpParseCtx(); + } + + public static RewriteQueryUsingAggregateIndexCtx getInstance(ParseContext parseContext, + Hive hiveDb, String indexTableName, String baseTableName, String aggregateFunction){ + return new RewriteQueryUsingAggregateIndexCtx( + parseContext, hiveDb, indexTableName, baseTableName, aggregateFunction); + } + + + private Map<Operator<? extends Serializable>, OpParseContext> opc = + new LinkedHashMap<Operator<? extends Serializable>, OpParseContext>(); + private final Hive hiveDb; + private final ParseContext parseContext; + //We need the GenericUDAFEvaluator for GenericUDAF function "sum" + private GenericUDAFEvaluator eval = null; + private final String indexTableName; + private final String baseTableName; + private final String aggregateFunction; + private ExprNodeColumnDesc aggrExprNode = null; + + public Map<Operator<? extends Serializable>, OpParseContext> getOpc() { + return opc; + } + + public ParseContext getParseContext() { + return parseContext; + } + + public Hive getHiveDb() { + return hiveDb; + } + + public String getIndexName() { + return indexTableName; + } + + public GenericUDAFEvaluator getEval() { + return eval; + } + + public void setEval(GenericUDAFEvaluator eval) { + this.eval = eval; + } + + public void setAggrExprNode(ExprNodeColumnDesc aggrExprNode) { + this.aggrExprNode = aggrExprNode; + } + + public ExprNodeColumnDesc getAggrExprNode() { + return aggrExprNode; + } + + /** + * Walk the original operator tree using the {@link DefaultGraphWalker} using the rules. + * Each of the rules invoke respective methods from the {@link RewriteQueryUsingAggregateIndex} + * to rewrite the original query using aggregate index. + * + * @param topOp + * @throws SemanticException + */ + public void invokeRewriteQueryProc( + Operator<? extends Serializable> topOp) throws SemanticException{ + Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); + + // replace scan operator containing original table with index table + opRules.put(new RuleRegExp("R1", "TS%"), + RewriteQueryUsingAggregateIndex.getReplaceTableScanProc()); + //rule that replaces index key selection with + //sum(`_count_of_indexed_column`) function in original query + opRules.put(new RuleRegExp("R2", "SEL%"), + RewriteQueryUsingAggregateIndex.getNewQuerySelectSchemaProc()); + //Manipulates the ExprNodeDesc from GroupByOperator aggregation list + opRules.put(new RuleRegExp("R3", "GBY%"), + RewriteQueryUsingAggregateIndex.getNewQueryGroupbySchemaProc()); + + // The dispatcher fires the processor corresponding to the closest matching + // rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, this); + GraphWalker ogw = new DefaultGraphWalker(disp); + + // Create a list of topop nodes + List<Node> topNodes = new ArrayList<Node>(); + topNodes.add(topOp); + ogw.startWalking(topNodes, null); + } + + /** + * Default procedure for {@link DefaultRuleDispatcher}. + * @return + */ + private NodeProcessor getDefaultProc() { + return new NodeProcessor() { + @Override + public Object process(Node nd, Stack<Node> stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { + return null; + } + }; + } + + public String getBaseTableName() { + return baseTableName; + } + + public String getAggregateFunction() { + return aggregateFunction; + } +} Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java?rev=1170007&r1=1170006&r2=1170007&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java Tue Sep 13 02:20:52 2011 @@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.optimi import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; @@ -31,11 +30,7 @@ import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hive.metastore.api.Index; -import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.MapRedTask; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; @@ -46,19 +41,17 @@ import org.apache.hadoop.hive.ql.index.H import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.IndexUtils; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; -import org.apache.hadoop.hive.ql.plan.FilterDesc; +import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; -import org.apache.hadoop.hive.ql.plan.MapredWork; /** * @@ -71,12 +64,10 @@ public class IndexWhereProcessor impleme private static final Log LOG = LogFactory.getLog(IndexWhereProcessor.class.getName()); private final Map<Table, List<Index>> indexes; - private Map<Index, Table> indexToIndexTable; public IndexWhereProcessor(Map<Table, List<Index>> indexes) { super(); this.indexes = indexes; - this.indexToIndexTable = new HashMap<Index, Table>(); } @Override @@ -107,7 +98,7 @@ public class IndexWhereProcessor impleme // check if we have indexes on all partitions in this table scan Set<Partition> queryPartitions; try { - queryPartitions = checkPartitionsCoveredByIndex(operator, pctx); + queryPartitions = IndexUtils.checkPartitionsCoveredByIndex(operator, pctx, indexes); if (queryPartitions == null) { // partitions not covered return null; } @@ -230,157 +221,6 @@ public class IndexWhereProcessor impleme return; } - /** - * Check the partitions used by the table scan to make sure they also exist in the - * index table - * @param pctx - * @param operator - * @return partitions used by query. null if they do not exist in index table - */ - private Set<Partition> checkPartitionsCoveredByIndex(TableScanOperator tableScan, ParseContext pctx) - throws HiveException { - Hive hive = Hive.get(pctx.getConf()); - - - // make sure each partition exists on the index table - PrunedPartitionList queryPartitionList = pctx.getOpToPartList().get(tableScan); - Set<Partition> queryPartitions = queryPartitionList.getConfirmedPartns(); - - for (Partition part : queryPartitions) { - List<Table> sourceIndexTables = getIndexTables(hive, part); - if (!containsPartition(hive, part)) { - return null; // problem if it doesn't contain the partition - } - } - - return queryPartitions; - } - - /** - * return index tables associated with a given base table - */ - private List<Table> getIndexTables(Hive hive, Table table) throws - HiveException { - List<Table> indexTables = new ArrayList<Table>(); - if (indexes == null || indexes.get(table) == null) { - return indexTables; - } - for (Index index : indexes.get(table)) { - Table indexTable = hive.getTable(index.getIndexTableName()); - indexToIndexTable.put(index, indexTable); - indexTables.add(indexTable); - } - return indexTables; - } - - /** - * return index tables associated with the base table of the partition - */ - private List<Table> getIndexTables(Hive hive, Partition part) throws HiveException { - List<Table> indexTables = new ArrayList<Table>(); - Table partitionedTable = part.getTable(); - if (indexes == null || indexes.get(partitionedTable) == null) { - return indexTables; - } - for (Index index : indexes.get(partitionedTable)) { - Table indexTable = hive.getTable(index.getIndexTableName()); - indexToIndexTable.put(index, indexTable); - indexTables.add(indexTable); - } - return indexTables; - } - - /** - * check that every index table contains the given partition and is fresh - */ - private boolean containsPartition(Hive hive, Partition part) - throws HiveException { - HashMap<String, String> partSpec = part.getSpec(); - - if (indexes == null || indexes.get(part.getTable()) == null) { - return false; - } - - if (partSpec.isEmpty()) { - // empty specs come from non-partitioned tables - return isIndexTableFresh(hive, indexes.get(part.getTable()), part.getTable()); - } - - for (Index index : indexes.get(part.getTable())) { - Table indexTable = indexToIndexTable.get(index); - // get partitions that match the spec - List<Partition> matchingPartitions = hive.getPartitions(indexTable, partSpec); - if (matchingPartitions == null || matchingPartitions.size() == 0) { - LOG.info("Index table " + indexTable + "did not contain built partition that matched " + partSpec); - return false; - } else if (!isIndexPartitionFresh(hive, index, part)) { - return false; - } - } - return true; - } - - /** - * Check the index partitions on a parttioned table exist and are fresh - */ - private boolean isIndexPartitionFresh(Hive hive, Index index, - Partition part) throws HiveException { - LOG.info("checking index staleness..."); - try { - FileSystem partFs = part.getPartitionPath().getFileSystem(hive.getConf()); - FileStatus partFss = partFs.getFileStatus(part.getPartitionPath()); - String ts = index.getParameters().get(part.getSpec().toString()); - if (ts == null) { - return false; - } - long indexTs = Long.parseLong(ts); - LOG.info(partFss.getModificationTime()); - LOG.info(ts); - if (partFss.getModificationTime() > indexTs) { - LOG.info("index is stale on the partitions that matched " + part.getSpec()); - return false; - } - } catch (IOException e) { - LOG.info("failed to grab timestamp info"); - throw new HiveException(e); - } - return true; - } - - /** - * Check that the indexes on the unpartioned table exist and are fresh - */ - private boolean isIndexTableFresh(Hive hive, List<Index> indexes, Table src) - throws HiveException { - //check that they exist - if (indexes == null || indexes.size() == 0) { - return false; - } - //check that they are not stale - for (Index index : indexes) { - LOG.info("checking index staleness..."); - try { - FileSystem srcFs = src.getPath().getFileSystem(hive.getConf()); - FileStatus srcFss= srcFs.getFileStatus(src.getPath()); - String ts = index.getParameters().get("base_timestamp"); - if (ts == null) { - return false; - } - long indexTs = Long.parseLong(ts); - LOG.info(srcFss.getModificationTime()); - LOG.info(ts); - if (srcFss.getModificationTime() > indexTs) { - LOG.info("index is stale "); - return false; - } - } catch (IOException e) { - LOG.info("failed to grab timestamp info"); - throw new HiveException(e); - } - } - return true; - } - /** * Insert the rewrite tasks at the head of the pctx task tree Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java?rev=1170007&r1=1170006&r2=1170007&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java Tue Sep 13 02:20:52 2011 @@ -29,8 +29,8 @@ import java.util.Stack; import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler; import org.apache.hadoop.hive.ql.index.bitmap.BitmapIndexHandler; +import org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -40,8 +40,8 @@ import org.apache.hadoop.hive.ql.lib.Nod import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.IndexUtils; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -118,7 +118,7 @@ public class IndexWhereTaskDispatcher im Map<Table, List<Index>> indexes = new HashMap<Table, List<Index>>(); for (Table tbl : topTables) { - List<Index> tblIndexes = getIndexes(tbl, supportedIndexes); + List<Index> tblIndexes = IndexUtils.getIndexes(tbl, supportedIndexes); if (tblIndexes.size() > 0) { indexes.put(tbl, tblIndexes); } @@ -136,29 +136,6 @@ public class IndexWhereTaskDispatcher im return operatorRules; } - /** - * Get a list of indexes on a table that match given types. - * Copied from HIVE-1694 patch - */ - private List<Index> getIndexes(Table baseTableMetaData, List<String> matchIndexTypes) - throws SemanticException { - List<Index> matchingIndexes = new ArrayList<Index>(); - List<Index> indexesOnTable = null; - - try { - indexesOnTable = baseTableMetaData.getAllIndexes((short) -1); // get all indexes - } catch (HiveException e) { - throw new SemanticException("Error accessing metastore", e); - } - - for (Index index : indexesOnTable) { - String indexType = index.getIndexHandlerClass(); - if (matchIndexTypes.contains(indexType)) { - matchingIndexes.add(index); - } - } - return matchingIndexes; - } private NodeProcessor getDefaultProcessor() { return new NodeProcessor() { Added: hive/trunk/ql/src/test/queries/clientpositive/ql_rewrite_gbtoidx.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/ql_rewrite_gbtoidx.q?rev=1170007&view=auto ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/ql_rewrite_gbtoidx.q (added) +++ hive/trunk/ql/src/test/queries/clientpositive/ql_rewrite_gbtoidx.q Tue Sep 13 02:20:52 2011 @@ -0,0 +1,170 @@ + +DROP TABLE lineitem; +CREATE TABLE lineitem (L_ORDERKEY INT, + L_PARTKEY INT, + L_SUPPKEY INT, + L_LINENUMBER INT, + L_QUANTITY DOUBLE, + L_EXTENDEDPRICE DOUBLE, + L_DISCOUNT DOUBLE, + L_TAX DOUBLE, + L_RETURNFLAG STRING, + L_LINESTATUS STRING, + l_shipdate STRING, + L_COMMITDATE STRING, + L_RECEIPTDATE STRING, + L_SHIPINSTRUCT STRING, + L_SHIPMODE STRING, + L_COMMENT STRING) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|'; + +LOAD DATA LOCAL INPATH '../data/files/lineitem.txt' OVERWRITE INTO TABLE lineitem; + +CREATE INDEX lineitem_lshipdate_idx ON TABLE lineitem(l_shipdate) AS 'org.apache.hadoop.hive.ql.index.AggregateIndexHandler' WITH DEFERRED REBUILD IDXPROPERTIES("AGGREGATES"="count(l_shipdate)"); +ALTER INDEX lineitem_lshipdate_idx ON lineitem REBUILD; + +explain select l_shipdate, count(l_shipdate) +from lineitem +group by l_shipdate; + +select l_shipdate, count(l_shipdate) +from lineitem +group by l_shipdate +order by l_shipdate; + +set hive.optimize.index.groupby=true; + +explain select l_shipdate, count(l_shipdate) +from lineitem +group by l_shipdate; + +select l_shipdate, count(l_shipdate) +from lineitem +group by l_shipdate +order by l_shipdate; + +set hive.optimize.index.groupby=false; + + +explain select year(l_shipdate) as year, + month(l_shipdate) as month, + count(l_shipdate) as monthly_shipments +from lineitem +group by year(l_shipdate), month(l_shipdate) +order by year, month; + +select year(l_shipdate) as year, + month(l_shipdate) as month, + count(l_shipdate) as monthly_shipments +from lineitem +group by year(l_shipdate), month(l_shipdate) +order by year, month; + +set hive.optimize.index.groupby=true; + +explain select year(l_shipdate) as year, + month(l_shipdate) as month, + count(l_shipdate) as monthly_shipments +from lineitem +group by year(l_shipdate), month(l_shipdate) +order by year, month; + +select year(l_shipdate) as year, + month(l_shipdate) as month, + count(l_shipdate) as monthly_shipments +from lineitem +group by year(l_shipdate), month(l_shipdate) +order by year, month; + +explain select lastyear.month, + thisyear.month, + (thisyear.monthly_shipments - lastyear.monthly_shipments) / +lastyear.monthly_shipments as monthly_shipments_delta + from (select year(l_shipdate) as year, + month(l_shipdate) as month, + count(l_shipdate) as monthly_shipments + from lineitem + where year(l_shipdate) = 1997 + group by year(l_shipdate), month(l_shipdate) + ) lastyear join + (select year(l_shipdate) as year, + month(l_shipdate) as month, + count(l_shipdate) as monthly_shipments + from lineitem + where year(l_shipdate) = 1998 + group by year(l_shipdate), month(l_shipdate) + ) thisyear + on lastyear.month = thisyear.month; + +explain select l_shipdate, cnt +from (select l_shipdate, count(l_shipdate) as cnt from lineitem group by l_shipdate +union all +select l_shipdate, l_orderkey as cnt +from lineitem) dummy; + +CREATE TABLE tbl(key int, value int); +CREATE INDEX tbl_key_idx ON TABLE tbl(key) AS 'org.apache.hadoop.hive.ql.index.AggregateIndexHandler' WITH DEFERRED REBUILD IDXPROPERTIES("AGGREGATES"="count(key)"); +ALTER INDEX tbl_key_idx ON tbl REBUILD; + +EXPLAIN select key, count(key) from tbl where key = 1 group by key; +EXPLAIN select key, count(key) from tbl group by key; + +EXPLAIN select count(1) from tbl; +EXPLAIN select count(key) from tbl; + +EXPLAIN select key FROM tbl GROUP BY key; +EXPLAIN select key FROM tbl GROUP BY value, key; +EXPLAIN select key FROM tbl WHERE key = 3 GROUP BY key; +EXPLAIN select key FROM tbl WHERE value = 2 GROUP BY key; +EXPLAIN select key FROM tbl GROUP BY key, substr(key,2,3); + +EXPLAIN select key, value FROM tbl GROUP BY value, key; +EXPLAIN select key, value FROM tbl WHERE value = 1 GROUP BY key, value; + +EXPLAIN select DISTINCT key FROM tbl; +EXPLAIN select DISTINCT key FROM tbl; +EXPLAIN select DISTINCT key FROM tbl; +EXPLAIN select DISTINCT key, value FROM tbl; +EXPLAIN select DISTINCT key, value FROM tbl WHERE value = 2; +EXPLAIN select DISTINCT key, value FROM tbl WHERE value = 2 AND key = 3; +EXPLAIN select DISTINCT key, value FROM tbl WHERE value = key; +EXPLAIN select DISTINCT key, substr(value,2,3) FROM tbl WHERE value = key; +EXPLAIN select DISTINCT key, substr(value,2,3) FROM tbl; + +EXPLAIN select * FROM (select DISTINCT key, value FROM tbl) v1 WHERE v1.value = 2; + +DROP TABLE tbl; + +CREATE TABLE tblpart (key int, value string) PARTITIONED BY (ds string, hr int); +INSERT OVERWRITE TABLE tblpart PARTITION (ds='2008-04-08', hr=11) SELECT key, value FROM srcpart WHERE ds = '2008-04-08' AND hr = 11; +INSERT OVERWRITE TABLE tblpart PARTITION (ds='2008-04-08', hr=12) SELECT key, value FROM srcpart WHERE ds = '2008-04-08' AND hr = 12; +INSERT OVERWRITE TABLE tblpart PARTITION (ds='2008-04-09', hr=11) SELECT key, value FROM srcpart WHERE ds = '2008-04-09' AND hr = 11; +INSERT OVERWRITE TABLE tblpart PARTITION (ds='2008-04-09', hr=12) SELECT key, value FROM srcpart WHERE ds = '2008-04-09' AND hr = 12; + +CREATE INDEX tbl_part_index ON TABLE tblpart(key) AS 'org.apache.hadoop.hive.ql.index.AggregateIndexHandler' WITH DEFERRED REBUILD IDXPROPERTIES("AGGREGATES"="count(key)"); + +ALTER INDEX tbl_part_index ON tblpart PARTITION (ds='2008-04-08', hr=11) REBUILD; +EXPLAIN SELECT key, count(key) FROM tblpart WHERE ds='2008-04-09' AND hr=12 AND key < 10 GROUP BY key; + +ALTER INDEX tbl_part_index ON tblpart PARTITION (ds='2008-04-08', hr=12) REBUILD; +ALTER INDEX tbl_part_index ON tblpart PARTITION (ds='2008-04-09', hr=11) REBUILD; +ALTER INDEX tbl_part_index ON tblpart PARTITION (ds='2008-04-09', hr=12) REBUILD; +EXPLAIN SELECT key, count(key) FROM tblpart WHERE ds='2008-04-09' AND hr=12 AND key < 10 GROUP BY key; + +DROP INDEX tbl_part_index on tblpart; +DROP TABLE tblpart; + +CREATE TABLE tbl(key int, value int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'; +LOAD DATA LOCAL INPATH '../data/files/tbl.txt' OVERWRITE INTO TABLE tbl; + +CREATE INDEX tbl_key_idx ON TABLE tbl(key) AS 'org.apache.hadoop.hive.ql.index.AggregateIndexHandler' WITH DEFERRED REBUILD IDXPROPERTIES("AGGREGATES"="count(key)"); +ALTER INDEX tbl_key_idx ON tbl REBUILD; + +set hive.optimize.index.groupby=false; +explain select key, count(key) from tbl group by key order by key; +select key, count(key) from tbl group by key order by key; +set hive.optimize.index.groupby=true; +explain select key, count(key) from tbl group by key order by key; +select key, count(key) from tbl group by key order by key; +DROP TABLE tbl; \ No newline at end of file
