http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala index 1c93617..bd8b4c6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala @@ -18,29 +18,46 @@ package org.apache.spark.sql.hive import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable -import org.apache.spark.SPARK_VERSION import org.apache.spark.sql._ -import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCast, MatchCastExpression} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCastExpression} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute} -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, Literal, NamedExpression, ScalaUDF, SortOrder} -import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Divide, Expression, Literal, NamedExpression, ScalaUDF, SortOrder} +import org.apache.spark.sql.catalyst.expressions.aggregate.{Count, _} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation} +import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.spark.sql.types._ -import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema} -import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.preagg.{AggregateQueryPlan, AggregateTableSelector, QueryColumn} import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo} -import org.apache.carbondata.spark.util.CarbonScalaUtil /** + * model class to store aggregate expression logical plan + * and its column schema mapping + * @param expression aggregate expression + * @param columnSchema list of column schema from table + */ +case class AggExpToColumnMappingModel( + expression: Expression, + var columnSchema: Option[Object] = None) { + override def equals(o: Any) : Boolean = o match { + case that: AggExpToColumnMappingModel => + that.expression==this.expression + case _ => false + } + // TODO need to update the hash code generation code + override def hashCode : Int = 1 +} +/** * Class for applying Pre Aggregate rules * Responsibility. * 1. Check plan is valid plan for updating the parent table plan with child table @@ -73,11 +90,36 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil * 6.1 validate maintable has timeseries datamap * 6.2 timeseries function is valid function or not * - * @param sparkSession - * spark session + * @param sparkSession spark session */ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule[LogicalPlan] { + /** + * map for keeping parent attribute reference to child attribute reference + * this will be used to updated the plan in case of join or order by + */ + val updatedExpression = mutable.HashMap[AttributeReference, AttributeReference]() + + /** + * parser + */ + lazy val parser = new CarbonSpark2SqlParser + + /** + * Below method will be used to validate the logical plan + * @param logicalPlan query logical plan + * @return isvalid or not + */ + private def isValidPlan(logicalPlan: LogicalPlan) : Boolean = { + var isValidPlan = true + logicalPlan.transform { + case aggregate@Aggregate(grp, aExp, child) => + isValidPlan = !aExp.exists(p => p.name.equals("preAggLoad") || p.name.equals("preAgg")) + val updatedAggExp = aExp.filterNot(_.name.equalsIgnoreCase("preAggLoad")) + Aggregate(grp, updatedAggExp, child) + } + isValidPlan + } override def apply(plan: LogicalPlan): LogicalPlan = { var needAnalysis = true plan.transformExpressions { @@ -98,244 +140,403 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule needAnalysis = false attr } + if(needAnalysis) { + needAnalysis = isValidPlan(plan) + } // if plan is not valid for transformation then return same plan if (!needAnalysis) { plan } else { - // create buffer to collect all the column and its metadata information - val list = scala.collection.mutable.HashSet.empty[QueryColumn] - var isValidPlan = true - val carbonTable = plan match { - // matching the plan based on supported plan - // if plan is matches with any case it will validate and get all - // information required for transforming the plan + val updatedPlan = transformPreAggQueryPlan(plan) + val newPlan = updatePlan(updatedPlan) + newPlan + } + } - // When plan has grouping expression, aggregate expression - // subquery - case Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable + /** + * Below method will be used to update the child plan + * This will be used for updating expression like join condition, + * order by, project list etc + * @param plan child plan + * @return updated plan + */ + def updatePlan(plan: LogicalPlan) : LogicalPlan = { + val updatedPlan = plan transform { + case Aggregate(grp, aggExp, child) => + Aggregate( + updateExpression(grp), + updateExpression(aggExp.asInstanceOf[Seq[Expression]]).asInstanceOf[Seq[NamedExpression]], + child) + case Filter(filterExp, child) => + Filter(updateExpression(Seq(filterExp)).head, child) + case Project(pList, child) => + Project( + updateExpression(pList.asInstanceOf[Seq[Expression]]).asInstanceOf[Seq[NamedExpression]], + child) + case Sort(sortOrders, global, child) => + Sort(updateSortExpression(sortOrders), global, child) + case Join(left, right, joinType, condition) => + val updatedCondition = condition match { + case Some(expr) => Some(updateExpression(Seq(expr)).head) + case _ => condition + } + Join(left, right, joinType, updatedCondition) + } + updatedPlan + } - // below case for handling filter query - // When plan has grouping expression, aggregate expression - // filter expression - case Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) - } - // getting the columns from filter expression - if(isValidPlan) { - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + /** + * Below method will be used to update the sort expression + * @param sortExp sort order expression in query + * @return updated sort expression + */ + def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = { + sortExp map { order => + SortOrder(order.child transform { + case attr: AttributeReference => + updatedExpression.find { p => p._1.sameRef(attr) } match { + case Some((_, childAttr)) => + AttributeReference( + childAttr.name, + childAttr.dataType, + childAttr.nullable, + childAttr.metadata)(childAttr.exprId, attr.qualifier, attr.isGenerated) + case None => + attr } - carbonTable + }, order.direction ) + } + } - // When plan has grouping expression, aggregate expression - // logical relation - case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable - // case for handling aggregation, order by - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - list ++ - extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) - } - carbonTable - // case for handling aggregation, order by and filter - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) - } - if (isValidPlan) { - list ++ - extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + /** + * Below method will be used to update the expression like group by expression + * @param expressions sequence of expression like group by + * @return updated expressions + */ + def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = { + expressions map { expression => + expression transform { + case attr: AttributeReference => + updatedExpression.find { p => p._1.sameRef(attr) } match { + case Some((_, childAttr)) => + AttributeReference( + childAttr.name, + childAttr.dataType, + childAttr.nullable, + childAttr.metadata)(childAttr.exprId, attr.qualifier, attr.isGenerated) + case None => + attr } - carbonTable - // case for handling aggregation with order by when only projection column exits - case Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, + } + } + } + + /** + * Below method will be used to validate and transform the main table plan to child table plan + * rules for transforming is as below. + * 1. Grouping expression rules + * 1.1 Change the parent attribute reference for of group expression + * to child attribute reference + * + * 2. Aggregate expression rules + * 2.1 Change the parent attribute reference for of group expression to + * child attribute reference + * 2.2 Change the count AggregateExpression to Sum as count + * is already calculated so in case of aggregate table + * we need to apply sum to get the count + * 2.2 In case of average aggregate function select 2 columns from aggregate table with + * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)). + * Note: During aggregate table creation for average table will be created with two columns + * one for sum(column) and count(column) to support rollup + * 3. Filter Expression rules. + * 3.1 Updated filter expression attributes with child table attributes + * 4. Update the Parent Logical relation with child Logical relation + * 5. timeseries function + * 5.1 validate parent table has timeseries datamap + * 5.2 timeseries function is valid function or not + * + * @param logicalPlan parent logical plan + * @return transformed plan + */ + def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = { + val updatedPlan = logicalPlan.transform { + // case for aggregation query + case agg@Aggregate( + grExp, + aggExp, + child@CarbonSubqueryAlias(_, l: LogicalRelation)) + if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => + val carbonTable = getCarbonTable(l) + val list = scala.collection.mutable.HashSet.empty[QueryColumn] + val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression] + val isValidPlan = extractQueryColumnsFromAggExpression( + grExp, + aggExp, + carbonTable, + list, + aggregateExpressions) + if(isValidPlan) { + val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list, + aggregateExpressions, carbonTable, - tableName, - list) - if(isValidPlan) { - list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders, - carbonTable = carbonTable, - tableName = tableName) + agg) + if(null != aggDataMapSchema && null!= childPlan) { + val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]] + val (updatedGroupExp, updatedAggExp, newChild, None) = + getUpdatedExpressions(grExp, + aggExp, + child, + None, + aggDataMapSchema, + attributes, + childPlan, + carbonTable, + agg) + Aggregate(updatedGroupExp, + updatedAggExp, + newChild) + } else { + agg } - carbonTable - // case for handling aggregation with order by and filter when only projection column exits - case Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, + } else { + agg + } + // case of handling aggregation query with filter + case agg@Aggregate( + grExp, + aggExp, + Filter(expression, child@CarbonSubqueryAlias(_, l: LogicalRelation))) + if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => + val carbonTable = getCarbonTable(l) + val list = scala.collection.mutable.HashSet.empty[QueryColumn] + val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression] + var isValidPlan = extractQueryColumnsFromAggExpression( + grExp, + aggExp, + carbonTable, + list, + aggregateExpressions) + // getting the columns from filter expression + isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(expression) + if (isValidPlan) { + extractColumnFromExpression(expression, list, carbonTable, true) + } + if(isValidPlan) { + val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list, + aggregateExpressions, carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) - } - if(isValidPlan) { - list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders, - carbonTable = carbonTable, - tableName = tableName) - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) - } - carbonTable - case _ => - isValidPlan = false - null - } - if (isValidPlan && null != carbonTable) { - isValidPlan = isSpecificSegmentPresent(carbonTable) - } - // if plan is valid then update the plan with child attributes - if (isValidPlan) { - // getting all the projection columns - val listProjectionColumn = list - .filter(queryColumn => queryColumn.getAggFunction.isEmpty && !queryColumn.isFilterColumn) - .toList - // getting all the filter columns - val listFilterColumn = list - .filter(queryColumn => queryColumn.getAggFunction.isEmpty && queryColumn.isFilterColumn) - .toList - // getting all the aggregation columns - val listAggregationColumn = list.filter(queryColumn => !queryColumn.getAggFunction.isEmpty) - .toList - // create a query plan object which will be used to select the list of pre aggregate tables - // matches with this plan - val queryPlan = new QueryPlan(listProjectionColumn.asJava, - listAggregationColumn.asJava, - listFilterColumn.asJava) - // create aggregate table selector object - val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable) - // select the list of valid child tables - val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema() - // if it does not match with any pre aggregate table return the same plan - if (!selectedDataMapSchemas.isEmpty) { - // filter the selected child schema based on size to select the pre-aggregate tables - // that are nonEmpty - val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore - val relationBuffer = selectedDataMapSchemas.asScala.map { selectedDataMapSchema => - val identifier = TableIdentifier( - selectedDataMapSchema.getRelationIdentifier.getTableName, - Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName)) - val carbonRelation = - catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation] - val relation = sparkSession.sessionState.catalog.lookupRelation(identifier) - (selectedDataMapSchema, carbonRelation, relation) - }.filter(_._2.sizeInBytes != 0L) - if (relationBuffer.isEmpty) { - // If the size of relation Buffer is 0 then it means that none of the pre-aggregate - // tables have date yet. - // In this case we would return the original plan so that the query hits the parent - // table. - plan + agg) + if(null != aggDataMapSchema && null!= childPlan) { + val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]] + val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = + getUpdatedExpressions(grExp, + aggExp, + child, + Some(expression), + aggDataMapSchema, + attributes, + childPlan, + carbonTable, + agg) + Aggregate(updatedGroupExp, + updatedAggExp, + Filter(updatedFilterExpression.get, + newChild)) } else { - // If the relationBuffer is nonEmpty then find the table with the minimum size. - val (aggDataMapSchema, _, relation) = relationBuffer.minBy(_._2.sizeInBytes) - val newRelation = new FindDataSourceTable(sparkSession).apply(relation) - // transform the query plan based on selected child schema - transformPreAggQueryPlan(plan, aggDataMapSchema, newRelation) + agg } } else { - plan + agg } + } + updatedPlan + } + + /** + * Below method will be used to validate query plan and get the proper aggregation data map schema + * and child relation plan object if plan is valid for transformation + * @param queryColumns list of query columns from projection and filter + * @param aggregateExpressions list of aggregate expression (aggregate function) + * @param carbonTable parent carbon table + * @param parentLogicalPlan parent logical relation + * @return if plan is valid then aggregation data map schema and its relation plan + */ + def getChildDataMapForTransformation(queryColumns: scala.collection.mutable.HashSet[QueryColumn], + aggregateExpressions: scala.collection.mutable.HashSet[AggregateExpression], + carbonTable: CarbonTable, + parentLogicalPlan: LogicalPlan): (AggregationDataMapSchema, LogicalPlan) = { + // getting all the projection columns + val listProjectionColumn = queryColumns + .filter(queryColumn => !queryColumn.isFilterColumn) + .toList + // getting all the filter columns + val listFilterColumn = queryColumns + .filter(queryColumn => queryColumn.isFilterColumn) + .toList + val isProjectionColumnPresent = (listProjectionColumn.size + listFilterColumn.size) > 0 + // create a query plan object which will be used to select the list of pre aggregate tables + // matches with this plan + val queryPlan = new AggregateQueryPlan(listProjectionColumn.asJava, listFilterColumn.asJava) + // create aggregate table selector object + val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable) + // select the list of valid child tables + val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema() + // query has only aggregate expression then selected data map will be empty + // the validate all the child data map otherwise validate selected data map + var selectedAggMaps = if (isProjectionColumnPresent) { + selectedDataMapSchemas + } else { + carbonTable.getTableInfo.getDataMapSchemaList + } + // if it does not match with any pre aggregate table return the same plan + if (!selectedAggMaps.isEmpty) { + // filter the selected child schema based on size to select the pre-aggregate tables + // that are nonEmpty + val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore + val relationBuffer = selectedAggMaps.asScala.map { selectedDataMapSchema => + val identifier = TableIdentifier( + selectedDataMapSchema.getRelationIdentifier.getTableName, + Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName)) + val carbonRelation = + catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation] + val relation = sparkSession.sessionState.catalog.lookupRelation(identifier) + (selectedDataMapSchema, carbonRelation, relation) + }.filter(_._2.sizeInBytes != 0L).sortBy(_._2.sizeInBytes) + if (relationBuffer.isEmpty) { + // If the size of relation Buffer is 0 then it means that none of the pre-aggregate + // tables have data yet. + // In this case we would return the original plan so that the query hits the parent + // table. + (null, null) } else { - plan + // if query does not have any aggregate function no need to validate the same + val tuple = if (aggregateExpressions.nonEmpty && !selectedAggMaps.isEmpty) { + relationBuffer.collectFirst { + case a@(datamapSchema, _, _) + if validateAggregateExpression(datamapSchema, + carbonTable, + parentLogicalPlan, + aggregateExpressions.toSeq) => + a + } + } else { + Some(relationBuffer.head) + } + tuple match { + case Some((dataMapSchema, _, logicalPlan)) => (dataMapSchema + .asInstanceOf[AggregationDataMapSchema], new FindDataSourceTable(sparkSession) + .apply(logicalPlan)) + case None => (null, null) + } + // If the relationBuffer is nonEmpty then find the table with the minimum size. + } + } else { + (null, null) + } + } + + /** + * Below method will be used to validate aggregate expression with the data map + * and will return the selected valid data maps + * @param selectedDataMap list of data maps + * @param carbonTable parent carbon table + * @param parentLogicalPlan parent logical plan + * @param queryAggExpLogicalPlans query agg expression logical plan + * @return valid data map + */ + def validateAggregateExpression(selectedDataMap: DataMapSchema, + carbonTable: CarbonTable, + parentLogicalPlan: LogicalPlan, + queryAggExpLogicalPlans: Seq[AggregateExpression]): Boolean = { + val mappingModel = getExpressionToColumnMapping(selectedDataMap, + carbonTable, + parentLogicalPlan) + queryAggExpLogicalPlans.forall{p => + mappingModel.exists{m => + PreAggregateUtil.normalizeExprId(p, parentLogicalPlan.allAttributes) == m.expression} + } + } + + /** + * Below method will be used to to get the logical plan for each aggregate expression in + * child data map and its column schema mapping if mapping is already present + * then it will use the same otherwise it will generate and stored in aggregation data map + * @param selectedDataMap child data map + * @param carbonTable parent table + * @param parentLogicalPlan logical relation of actual plan + * @return map of logical plan for each aggregate expression in child query and its column mapping + */ + def getExpressionToColumnMapping(selectedDataMap: DataMapSchema, + carbonTable: CarbonTable, + parentLogicalPlan: LogicalPlan): mutable.Set[AggExpToColumnMappingModel] = { + val aggDataMapSchema = selectedDataMap.asInstanceOf[AggregationDataMapSchema] + if(null == aggDataMapSchema.getAggExpToColumnMapping) { + // add preAGG UDF to avoid all the PreAggregate rule + val childDataMapQueryString = parser.addPreAggFunction( + aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY")) + // get the logical plan + val aggPlan = sparkSession.sql(childDataMapQueryString).logicalPlan + // getting all aggregate expression from query + val dataMapAggExp = getAggregateExpFromChildDataMap(aggPlan) + // in case of average child table will have two columns which will be stored in sequence + // so for average expression we need to get two columns for mapping + var counter = 0 + // sorting the columns based on schema ordinal so search will give proper result + val sortedColumnList = aggDataMapSchema.getChildSchema.getListOfColumns.asScala + .sortBy(_.getSchemaOrdinal) + val expressionToColumnMapping = mutable.LinkedHashSet.empty[AggExpToColumnMappingModel] + dataMapAggExp.foreach { aggExp => + val updatedExp = PreAggregateUtil.normalizeExprId(aggExp, aggPlan.allAttributes) + val model = AggExpToColumnMappingModel(updatedExp, None) + if (!expressionToColumnMapping.contains(model)) { + // check if aggregate expression is of type avg + // get the columns + val columnSchema = aggDataMapSchema + .getAggColumnBasedOnIndex(counter, sortedColumnList.asJava) + // increment the counter so when for next expression above code will be + // executed it will search from that schema ordinal + counter = columnSchema.getSchemaOrdinal + 1 + model.columnSchema = Some(columnSchema) + expressionToColumnMapping += model + } } + aggDataMapSchema.setAggExpToColumnMapping(expressionToColumnMapping.asJava) + // return the mapping + expressionToColumnMapping + } else { + aggDataMapSchema.getAggExpToColumnMapping + .asInstanceOf[java.util.Set[AggExpToColumnMappingModel]].asScala + .asInstanceOf[mutable.LinkedHashSet[AggExpToColumnMappingModel]] } } /** + * Below method will be used to get aggregate expression + * @param logicalPlan logical plan + * @return list of aggregate expression + */ + def getAggregateExpFromChildDataMap(logicalPlan: LogicalPlan): Seq[AggregateExpression] = { + val list = scala.collection.mutable.ListBuffer.empty[AggregateExpression] + logicalPlan match { + case _@Aggregate(_, aggExp, _) => + aggExp map { + case Alias(attr: AggregateExpression, _) => + list ++= PreAggregateUtil.validateAggregateFunctionAndGetFields(attr) + case _ => + } + } + list + } + + /** * Below method will be used to check whether specific segment is set for maintable * if it is present then no need to transform the plan and query will be executed on * maintable - * @param carbonTable - * parent table + * @param carbonTable parent table * @return is specific segment is present in session params */ def isSpecificSegmentPresent(carbonTable: CarbonTable) : Boolean = { @@ -353,26 +554,21 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule /** * Below method will be used to extract the query columns from * filter expression - * @param filterExp - * filter expression - * @param set - * query column list - * @param carbonTable - * parent table - * @param tableName - * table name + * @param expression filter expression + * @param queryColumns query column set + * @param carbonTable parent table * @return isvalid filter expression for aggregate */ - def extractQueryColumnFromFilterExp(filterExp: Expression, - set: scala.collection.mutable.HashSet[QueryColumn], - carbonTable: CarbonTable, tableName: String): Boolean = { + def extractColumnFromExpression(expression: Expression, + queryColumns: scala.collection.mutable.HashSet[QueryColumn], + carbonTable: CarbonTable, + isFilterColumn: Boolean = false) { // map to maintain attribute reference present in the filter to timeseries function // if applied this is added to avoid duplicate column val mapOfColumnSeriesFun = scala.collection.mutable.HashMap.empty[AttributeReference, String] - val isValidPlan = true - filterExp.transform { + expression.transform { case attr: AttributeReference => - if (!mapOfColumnSeriesFun.get(attr).isDefined) { + if (mapOfColumnSeriesFun.get(attr).isEmpty) { mapOfColumnSeriesFun.put(attr, null) } attr @@ -381,13 +577,13 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule if (udf.asInstanceOf[ScalaUDF].function.getClass.getName.equalsIgnoreCase( "org.apache.spark.sql.execution.command.timeseries.TimeseriesFunction") && CarbonUtil.hasTimeSeriesDataMap(carbonTable)) { - mapOfColumnSeriesFun.put(udf.children(0).asInstanceOf[AttributeReference], - udf.children(1).asInstanceOf[Literal].value.toString) + mapOfColumnSeriesFun.put(udf.children.head.asInstanceOf[AttributeReference], + udf.children.last.asInstanceOf[Literal].value.toString) } else { // for any other scala udf udf.transform { case attr: AttributeReference => - if (!mapOfColumnSeriesFun.get(attr).isDefined) { + if (mapOfColumnSeriesFun.get(attr).isEmpty) { mapOfColumnSeriesFun.put(attr, null) } attr @@ -396,90 +592,37 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule udf } mapOfColumnSeriesFun.foreach { f => - if (f._2 == null) { - set += - getQueryColumn(f._1.name, carbonTable, tableName, isFilterColumn = true) - } else { - set += getQueryColumn(f._1.name, - carbonTable, - carbonTable.getTableName, - isFilterColumn = true, - timeseriesFunction = f._2) - } - } - isValidPlan - } - /** - * Below method will be used to extract columns from order by expression - * @param projectList - * project list from plan - * @param sortOrders - * sort order in plan - * @param carbonTable - * carbon table - * @param tableName - * table name - * @return query columns from expression - */ - def extractQueryColumnForOrderBy(projectList: Option[Seq[NamedExpression]] = None, - sortOrders: Seq[SortOrder], - carbonTable: CarbonTable, - tableName: String): Seq[QueryColumn] = { - val list = scala.collection.mutable.ListBuffer.empty[QueryColumn] - if(projectList.isDefined) { - projectList.get.map { - proList => - proList.transform { - case attr: AttributeReference => - val queryColumn = getQueryColumn(attr.name, carbonTable, tableName) - if (null != queryColumn) { - list += queryColumn - } - attr - } + if (f._2 == null) { + queryColumns += + getQueryColumn(f._1.name, carbonTable, isFilterColumn) + } else { + queryColumns += getQueryColumn(f._1.name, + carbonTable, + isFilterColumn, + timeseriesFunction = f._2) } } - sortOrders.foreach { sortOrder => - sortOrder.child match { - case attr: AttributeReference => - val queryColumn = getQueryColumn(attr.name, carbonTable, tableName) - if (null != queryColumn) { - list += queryColumn - } - } - } - list } /** * Below method will be used to get the child attribute reference * based on parent name * - * @param dataMapSchema - * child schema - * @param attributeReference - * parent attribute reference - * @param attributes - * child logical relation - * @param aggFunction - * aggregation function applied on child - * @param canBeNull - * this is added for strict validation in which case child attribute can be + * @param dataMapSchema child schema + * @param attributeReference parent attribute reference + * @param attributes child logical relation + * @param canBeNull this is added for strict validation in which case child attribute can be * null and when it cannot be null * @return child attribute reference */ def getChildAttributeReference(dataMapSchema: DataMapSchema, attributeReference: AttributeReference, attributes: Seq[AttributeReference], - aggFunction: String = "", canBeNull: Boolean = false, timeseriesFunction: String = ""): AttributeReference = { - val aggregationDataMapSchema = dataMapSchema.asInstanceOf[AggregationDataMapSchema]; - val columnSchema = if (aggFunction.isEmpty && timeseriesFunction.isEmpty) { + val aggregationDataMapSchema = dataMapSchema.asInstanceOf[AggregationDataMapSchema] + val columnSchema = if (timeseriesFunction.isEmpty) { aggregationDataMapSchema.getChildColByParentColName(attributeReference.name.toLowerCase) - } else if (!aggFunction.isEmpty) { - aggregationDataMapSchema.getAggChildColByParent(attributeReference.name.toLowerCase, - aggFunction) } else { aggregationDataMapSchema .getTimeseriesChildColByParent(attributeReference.name.toLowerCase, @@ -499,254 +642,6 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } /** - * Below method will be used to transform the main table plan to child table plan - * rules for transformming is as below. - * 1. Grouping expression rules - * 1.1 Change the parent attribute reference for of group expression - * to child attribute reference - * - * 2. Aggregate expression rules - * 2.1 Change the parent attribute reference for of group expression to - * child attribute reference - * 2.2 Change the count AggregateExpression to Sum as count - * is already calculated so in case of aggregate table - * we need to apply sum to get the count - * 2.2 In case of average aggregate function select 2 columns from aggregate table with - * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)). - * Note: During aggregate table creation for average table will be created with two columns - * one for sum(column) and count(column) to support rollup - * 3. Filter Expression rules. - * 3.1 Updated filter expression attributes with child table attributes - * 4. Update the Parent Logical relation with child Logical relation - * 5. Order by plan rules. - * 5.1 Update project list based on updated aggregate expression - * 5.2 Update sort order attributes based on pre aggregate table - * 6. timeseries function - * 6.1 validate parent table has timeseries datamap - * 6.2 timeseries function is valid function or not - * - * @param logicalPlan - * parent logical plan - * @param aggDataMapSchema - * select data map schema - * @param childPlan - * child carbon table relation - * @return transformed plan - */ - def transformPreAggQueryPlan(logicalPlan: LogicalPlan, - aggDataMapSchema: DataMapSchema, - childPlan: LogicalPlan): LogicalPlan = { - val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]] - logicalPlan.transform { - // case for aggregation query - case Aggregate(grExp, aggExp, child@CarbonSubqueryAlias(_, l: LogicalRelation)) - if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (updatedGroupExp, updatedAggExp, newChild, None) = - getUpdatedExpressions(grExp, - aggExp, - child, - None, - aggDataMapSchema, - attributes, - childPlan) - Aggregate(updatedGroupExp, - updatedAggExp, - newChild) - // case of handling aggregation query with filter - case Aggregate(grExp, - aggExp, - Filter(expression, child@CarbonSubqueryAlias(_, l: LogicalRelation))) - if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = - getUpdatedExpressions(grExp, - aggExp, - child, - Some(expression), - aggDataMapSchema, - attributes, - childPlan) - Aggregate(updatedGroupExp, - updatedAggExp, - Filter(updatedFilterExpression.get, - newChild)) - // case for aggregation query - case Aggregate(grExp, aggExp, l: LogicalRelation) - if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (updatedGroupExp, updatedAggExp, newChild, None) = - getUpdatedExpressions(grExp, - aggExp, - l, - None, - aggDataMapSchema, - attributes, - childPlan) - Aggregate(updatedGroupExp, - updatedAggExp, - newChild) - // case for aggregation query with order by - case Project(_, - Sort(sortOrders, - global, - Aggregate(groupingExp, - aggregateExp, - subQuery@CarbonSubqueryAlias(_, l: LogicalRelation)))) - if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (updatedGroupExp, updatedAggExp, newChild, None) = - getUpdatedExpressions(groupingExp, - aggregateExp, - subQuery, - None, - aggDataMapSchema, - attributes, - childPlan) - val (updatedProjectList, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp, - sortOrders, - aggDataMapSchema, - attributes) - Project(updatedProjectList, - Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, newChild))) - // case for handling aggregation query with filter and order by - case Project(_, - Sort(sortOrders, - global, - Aggregate(groupingExp, - aggregateExp, - Filter(expression, subQuery@CarbonSubqueryAlias(_, l: LogicalRelation))))) - if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = - getUpdatedExpressions(groupingExp, - aggregateExp, - subQuery, - Some(expression), - aggDataMapSchema, - attributes, - childPlan) - val (updatedProjectList, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp, - sortOrders, - aggDataMapSchema, - attributes) - Project(updatedProjectList, - Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, - Filter(updatedFilterExpression.get, newChild)))) - // case for handling aggregation with order by when only projection column exits - case Sort(sortOrders, - global, - Aggregate( - groupingExp, - aggregateExp, - subQuery@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (updatedGroupExp, updatedAggExp, newChild, None) = - getUpdatedExpressions(groupingExp, - aggregateExp, - subQuery, - None, - aggDataMapSchema, - attributes, - childPlan) - val (_, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp, - sortOrders, - aggDataMapSchema, - attributes) - Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, newChild)) - // case for handling aggregation with order by and filter when only projection column exits - case Sort(sortOrders, - global, - Aggregate(groupingExp, - aggregateExp, - Filter(expression, subQuery@CarbonSubqueryAlias(_, l: LogicalRelation)))) - if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = - getUpdatedExpressions(groupingExp, - aggregateExp, - subQuery, - Some(expression), - aggDataMapSchema, - attributes, - childPlan) - val (_, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp, - sortOrders, - aggDataMapSchema, - attributes) - Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, newChild)) - } - } - - /** - * Below method will be used to updated the maintable plan for order by query - * In case of order by we need to update project list and sort order attributes. - * - * @param aggregateExp - * child table aggregate expression - * @param sortOrders - * sort order expression in maintable plan - * @param aggDataMapSchema - * child data map schema - * @param attributes - * child attributes - * @return updated project list and updated sort order - */ - def transformPlanForOrderBy(aggregateExp: Seq[NamedExpression], - sortOrders: Seq[SortOrder], aggDataMapSchema: DataMapSchema, - attributes: Seq[AttributeReference]): (Seq[NamedExpression], Seq[SortOrder]) = { - val updatedProjectList = new ArrayBuffer[NamedExpression]() - // getting the updated project list from aggregate expression - aggregateExp.foreach{f => f.transform { - // for projection column - case alias@Alias(attr: AttributeReference, name) => - updatedProjectList += AttributeReference(name, attr.dataType, attr.nullable)(alias.exprId, - alias.qualifier, - alias.isGenerated) - alias - // for aggregaton column - case alias@Alias(attr: AggregateExpression, name) => - updatedProjectList += AttributeReference(name, attr.dataType, attr.nullable)(alias.exprId, - alias.qualifier, - alias.isGenerated) - alias - case alias@Alias(exp: Expression, name) => - updatedProjectList += AttributeReference(name, exp.dataType, exp.nullable)(alias.exprId, - alias.qualifier, - alias.isGenerated) - alias - } - } - // getting the updated sort order - val updatedSortOrders = sortOrders.map { order => - order.child match { - case attr: AttributeReference => - val childAttribute = getChildAttributeReference(aggDataMapSchema, - attr, - attributes, - canBeNull = true) - // child attribute can be null only in case of alias in query - // so in that case we need to update the sortorder based on new alias - if (null != childAttribute) { - val childExpression = getUpdatedSortOrderExpression(childAttribute, aggregateExp) - SortOrder(childExpression, order.direction) - } else { - val childExpression = getUpdatedSortOrderExpression(attr, aggregateExp) - SortOrder(childExpression, order.direction) - } - } - } - (updatedProjectList, updatedSortOrders) - } - /** * Below method will be used to get the updated expression for pre aggregated table. * It will replace the attribute of actual plan with child table attributes. * Updation will be done for below expression. @@ -755,37 +650,41 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule * 3. child logical plan * 4. filter expression if present * - * @param groupingExpressions - * actual plan grouping expression - * @param aggregateExpressions - * actual plan aggregate expression - * @param child - * child logical plan - * @param filterExpression - * filter expression - * @param aggDataMapSchema - * pre aggregate table schema - * @param attributes - * pre aggregate table logical relation - * @param aggPlan - * aggregate logical plan + * @param groupingExpressions actual plan grouping expression + * @param aggregateExpressions actual plan aggregate expression + * @param child child logical plan + * @param filterExpression filter expression + * @param aggDataMapSchema pre aggregate table schema + * @param attributes pre aggregate table logical relation + * @param aggPlan aggregate logical plan * @return tuple of(updated grouping expression, - * updated aggregate expression, - * updated child logical plan, - * updated filter expression if present in actual plan) + * updated aggregate expression, + * updated child logical plan, + * updated filter expression if present in actual plan) */ def getUpdatedExpressions(groupingExpressions: Seq[Expression], aggregateExpressions: Seq[NamedExpression], child: LogicalPlan, filterExpression: Option[Expression] = None, - aggDataMapSchema: DataMapSchema, + aggDataMapSchema: AggregationDataMapSchema, attributes: Seq[AttributeReference], - aggPlan: LogicalPlan): (Seq[Expression], Seq[NamedExpression], LogicalPlan, - Option[Expression]) = { + aggPlan: LogicalPlan, + parentTable: CarbonTable, + parentLogicalPlan: LogicalPlan): (Seq[Expression], Seq[NamedExpression], LogicalPlan, + Option[Expression]) = { + val aggExpColumnMapping = if (null != aggDataMapSchema.getAggExpToColumnMapping) { + Some(aggDataMapSchema.getAggExpToColumnMapping + .asInstanceOf[java.util.Set[AggExpToColumnMappingModel]].asScala + .asInstanceOf[mutable.LinkedHashSet[AggExpToColumnMappingModel]]) + } else { + None + } + // transforming the group by expression attributes with child attributes val updatedGroupExp = groupingExpressions.map { exp => exp.transform { case attr: AttributeReference => - getChildAttributeReference(aggDataMapSchema, attr, attributes) + val childAttr = getChildAttributeReference(aggDataMapSchema, attr, attributes) + childAttr } } // below code is for updating the aggregate expression. @@ -806,59 +705,97 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule val updatedAggExp = aggregateExpressions.map { // case for attribute reference case attr: AttributeReference => - val childAttributeReference = getChildAttributeReference(aggDataMapSchema, + val childAttr = getChildAttributeReference(aggDataMapSchema, attr, attributes) + val newExpressionId = NamedExpression.newExprId + val childTableAttr = AttributeReference(attr.name, + childAttr.dataType, + childAttr.nullable, + childAttr.metadata)(newExpressionId, childAttr.qualifier, attr.isGenerated) + updatedExpression.put(attr, childTableAttr) // returning the alias to show proper column name in output - Alias(childAttributeReference, - attr.name)(NamedExpression.newExprId, - childAttributeReference.qualifier).asInstanceOf[NamedExpression] + Alias(childAttr, + attr.name)(newExpressionId, + childAttr.qualifier).asInstanceOf[NamedExpression] // case for alias - case Alias(attr: AttributeReference, name) => - val childAttributeReference = getChildAttributeReference(aggDataMapSchema, + case alias@Alias(attr: AttributeReference, name) => + val childAttr = getChildAttributeReference(aggDataMapSchema, attr, attributes) + val newExpressionId = NamedExpression.newExprId + val parentTableAttr = AttributeReference(name, + alias.dataType, + alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated) + val childTableAttr = AttributeReference(name, + alias.dataType, + alias.nullable) (newExpressionId, alias.qualifier, alias.isGenerated) + updatedExpression.put(parentTableAttr, childTableAttr) // returning alias with child attribute reference - Alias(childAttributeReference, - name)(NamedExpression.newExprId, - childAttributeReference.qualifier).asInstanceOf[NamedExpression] + Alias(childAttr, + name)(newExpressionId, + childAttr.qualifier).asInstanceOf[NamedExpression] // for aggregate function case case alias@Alias(attr: AggregateExpression, name) => // get the updated aggregate aggregate function - val aggExp = getUpdatedAggregateExpressionForChild(attr, - aggDataMapSchema, - attributes) + val aggExp = if (aggExpColumnMapping.isDefined) { + getUpdatedAggregateExpressionForChild(attr, + aggDataMapSchema, + attributes, + parentTable, + parentLogicalPlan, + aggExpColumnMapping.get) + } else { + attr + } + val newExpressionId = NamedExpression.newExprId + val parentTableAttr = AttributeReference(name, + alias.dataType, + alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated) + val childTableAttr = AttributeReference(name, + alias.dataType, + alias.nullable) (newExpressionId, alias.qualifier, alias.isGenerated) + updatedExpression.put(parentTableAttr, childTableAttr) // returning alias with child attribute reference Alias(aggExp, - name)(NamedExpression.newExprId, + name)(newExpressionId, alias.qualifier).asInstanceOf[NamedExpression] case alias@Alias(expression: Expression, name) => val updatedExp = if (expression.isInstanceOf[ScalaUDF] && expression.asInstanceOf[ScalaUDF].function.getClass.getName.equalsIgnoreCase( "org.apache.spark.sql.execution.command.timeseries.TimeseriesFunction")) { - expression.asInstanceOf[ScalaUDF].transform { - case attr: AttributeReference => - val childAttributeReference = getChildAttributeReference(aggDataMapSchema, - attr, - attributes, - timeseriesFunction = - expression.asInstanceOf[ScalaUDF].children(1).asInstanceOf[Literal].value.toString) - childAttributeReference - } - } else { - expression.transform{ - case attr: AttributeReference => - val childAttributeReference = getChildAttributeReference(aggDataMapSchema, - attr, - attributes) - childAttributeReference + expression.asInstanceOf[ScalaUDF].transform { + case attr: AttributeReference => + val childAttributeReference = getChildAttributeReference(aggDataMapSchema, + attr, + attributes, + timeseriesFunction = + expression.asInstanceOf[ScalaUDF].children(1).asInstanceOf[Literal].value + .toString) + childAttributeReference + } + } else { + expression.transform{ + case attr: AttributeReference => + val childAttributeReference = getChildAttributeReference(aggDataMapSchema, + attr, + attributes) + childAttributeReference + } } - } - Alias(updatedExp, name)(NamedExpression.newExprId, + val newExpressionId = NamedExpression.newExprId + val parentTableAttr = AttributeReference(name, + alias.dataType, + alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated) + val childTableAttr = AttributeReference(name, + alias.dataType, + alias.nullable) (newExpressionId, alias.qualifier, alias.isGenerated) + updatedExpression.put(parentTableAttr, childTableAttr) + Alias(updatedExp, name)(newExpressionId, alias.qualifier).asInstanceOf[NamedExpression] } - // transformaing the logical relation + // transforming the logical relation val newChild = child.transform { case _: LogicalRelation => aggPlan @@ -866,7 +803,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule aggPlan match { case s: SubqueryAlias => s.child case others => others - } + } } // updating the filter expression if present val updatedFilterExpression = if (filterExpression.isDefined) { @@ -882,39 +819,6 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } /** - * Below method will be used to get the updated sort order attribute - * based on pre aggregate table - * @param sortOrderAttr - * sort order attributes reference - * @param aggregateExpressions - * aggregate expression - * @return updated sortorder attribute - */ - def getUpdatedSortOrderExpression(sortOrderAttr: AttributeReference, - aggregateExpressions: Seq[NamedExpression]): Expression = { - val updatedExpression = aggregateExpressions collectFirst { - // in case of alias we need to match with alias name and when alias is not present - // we need to compare with attribute reference name - case alias@Alias(attr: AttributeReference, name) - if attr.name.equalsIgnoreCase(sortOrderAttr.name) || - name.equalsIgnoreCase(sortOrderAttr.name) => - AttributeReference(name, - sortOrderAttr.dataType, - sortOrderAttr.nullable, - sortOrderAttr.metadata)(alias.exprId, alias.qualifier, alias.isGenerated) - case alias@Alias(_: Expression, name) - if name.equalsIgnoreCase(sortOrderAttr.name) => - AttributeReference(name, - sortOrderAttr.dataType, - sortOrderAttr.nullable, - sortOrderAttr.metadata)(alias.exprId, alias.qualifier, alias.isGenerated) - } - // any case it will match the condition, so no need to check whether updated expression is empty - // or not - updatedExpression.get - } - - /** * Below method will be used to get the aggregate expression based on match * Aggregate expression updation rules * 1 Change the count AggregateExpression to Sum as count @@ -927,166 +831,120 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule * table will be created with two columns one for sum(column) and count(column) * to support rollup * - * @param aggExp - * aggregate expression - * @param dataMapSchema - * child data map schema - * @param attributes - * child logical relation + * @param aggExp aggregate expression + * @param dataMapSchema child data map schema + * @param attributes child logical relation + * @param parentTable parent carbon table + * @param parentLogicalPlan logical relation * @return updated expression */ def getUpdatedAggregateExpressionForChild(aggExp: AggregateExpression, - dataMapSchema: DataMapSchema, - attributes: Seq[AttributeReference]): + dataMapSchema: AggregationDataMapSchema, + attributes: Seq[AttributeReference], + parentTable: CarbonTable, + parentLogicalPlan: LogicalPlan, + aggExpColumnMapping: mutable.LinkedHashSet[AggExpToColumnMappingModel]): Expression = { + // get the updated aggregate expression, in case of average column + // it will be divided in two aggergate expression + val updatedAggExp = PreAggregateUtil.validateAggregateFunctionAndGetFields(aggExp) + // get the attributes to be updated for child table + val attrs = aggExpColumnMapping.collect { + case (schemaAggExpModel) + if updatedAggExp + .exists(p => + schemaAggExpModel.expression == + PreAggregateUtil.normalizeExprId(p, parentLogicalPlan.allAttributes)) => + attributes filter (_.name.equalsIgnoreCase( + schemaAggExpModel.columnSchema.get.asInstanceOf[ColumnSchema].getColumnName)) + }.flatten + aggExp.aggregateFunction match { + case Sum(MatchCastExpression(_, changeDataType: DataType)) => + AggregateExpression(Sum(Cast(attrs.head, changeDataType)), aggExp.mode, isDistinct = false) + case Sum(_) => + AggregateExpression(Sum(attrs.head), aggExp.mode, isDistinct = false) + case Max(MatchCastExpression(_, changeDataType: DataType)) => + AggregateExpression(Max(Cast(attrs.head, changeDataType)), aggExp.mode, isDistinct = false) + case Max(_) => + AggregateExpression(Max(attrs.head), aggExp.mode, isDistinct = false) + case Min(MatchCastExpression(_, changeDataType: DataType)) => + AggregateExpression(Min(Cast(attrs.head, changeDataType)), aggExp.mode, isDistinct = false) + case Min(_) => + AggregateExpression(Min(attrs.head), aggExp.mode, isDistinct = false) // Change the count AggregateExpression to Sum as count // is already calculated so in case of aggregate table // we need to apply sum to get the count - case count@Count(Seq(attr: AttributeReference)) => - AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema, - attr, - attributes, - count.prettyName), - LongType)), - aggExp.mode, - isDistinct = false) - case sum@Sum(attr: AttributeReference) => - AggregateExpression(Sum(getChildAttributeReference(dataMapSchema, - attr, - attributes, - sum.prettyName)), - aggExp.mode, - isDistinct = false) - case max@Max(attr: AttributeReference) => - AggregateExpression(Max(getChildAttributeReference(dataMapSchema, - attr, - attributes, - max.prettyName)), - aggExp.mode, - isDistinct = false) - case min@Min(attr: AttributeReference) => - AggregateExpression(Min(getChildAttributeReference(dataMapSchema, - attr, - attributes, - min.prettyName)), - aggExp.mode, - isDistinct = false) - case sum@Sum(MatchCast(attr: AttributeReference, changeDataType: DataType)) => - AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema, - attr, - attributes, - sum.prettyName), - changeDataType)), - aggExp.mode, - isDistinct = false) - case min@Min(MatchCast(attr: AttributeReference, changeDataType: DataType)) => - AggregateExpression(Min(Cast(getChildAttributeReference(dataMapSchema, - attr, - attributes, - min.prettyName), - changeDataType)), - aggExp.mode, - isDistinct = false) - case max@Max(MatchCast(attr: AttributeReference, changeDataType: DataType)) => - AggregateExpression(Max(Cast(getChildAttributeReference(dataMapSchema, - attr, - attributes, - max.prettyName), - changeDataType)), - aggExp.mode, - isDistinct = false) - + case Count(Seq(expression: Expression)) => + AggregateExpression(Sum(Cast(attrs.head, LongType)), aggExp.mode, isDistinct = false) // In case of average aggregate function select 2 columns from aggregate table // with aggregation sum and count. // Then add divide(sum(column with sum), sum(column with count)). - case Average(attr: AttributeReference) => - Divide(AggregateExpression(Sum(getChildAttributeReference(dataMapSchema, - attr, - attributes, - "sum")), + case Average(MatchCastExpression(exp: Expression, changeDataType: DataType)) => + Divide(AggregateExpression(Sum(Cast( + attrs.head, + DoubleType)), aggExp.mode, isDistinct = false), - AggregateExpression(Sum(getChildAttributeReference(dataMapSchema, - attr, - attributes, - "count")), + AggregateExpression(Sum(Cast( + attrs.last, + DoubleType)), aggExp.mode, isDistinct = false)) // In case of average aggregate function select 2 columns from aggregate table // with aggregation sum and count. // Then add divide(sum(column with sum), sum(column with count)). - case Average(MatchCast(attr: AttributeReference, changeDataType: DataType)) => - Divide(AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema, - attr, - attributes, - "sum"), - DoubleType)), + case Average(exp: Expression) => + Divide(AggregateExpression(Sum(attrs.head), aggExp.mode, isDistinct = false), - AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema, - attr, - attributes, - "count"), - DoubleType)), + AggregateExpression(Sum(attrs.last), aggExp.mode, isDistinct = false)) } } - /** * Method to get the carbon table and table name - * - * @param parentLogicalRelation - * parent table relation - * @return tuple of carbon table and table name + * @param parentLogicalRelation parent table relation + * @return tuple of carbon table */ - def getCarbonTableAndTableName(parentLogicalRelation: LogicalRelation): (CarbonTable, String) = { + def getCarbonTable(parentLogicalRelation: LogicalRelation): CarbonTable = { val carbonTable = parentLogicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation] .carbonRelation .metaData.carbonTable - val tableName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier - .getTableName - (carbonTable, tableName) + carbonTable } /** * Below method will be used to get the query columns from plan - * - * @param groupByExpression - * group by expression - * @param aggregateExpressions - * aggregate expression - * @param carbonTable - * parent carbon table - * @param tableName - * parent table name - * @param set - * list of attributes + * @param groupByExpression group by expression + * @param aggregateExpressions aggregate expression + * @param carbonTable parent carbon table + * @param queryColumns list of attributes * @return plan is valid */ def extractQueryColumnsFromAggExpression(groupByExpression: Seq[Expression], aggregateExpressions: Seq[NamedExpression], - carbonTable: CarbonTable, tableName: String, - set: scala.collection.mutable.HashSet[QueryColumn]): Boolean = { + carbonTable: CarbonTable, + queryColumns: scala.collection.mutable.HashSet[QueryColumn], + aggreagteExps: scala.collection.mutable.HashSet[AggregateExpression]): Boolean = { + groupByExpression foreach { expression => + extractColumnFromExpression(expression, queryColumns, carbonTable) + } aggregateExpressions.map { case attr: AttributeReference => - set += getQueryColumn(attr.name, - carbonTable, - tableName) + queryColumns += getQueryColumn(attr.name, + carbonTable) case Alias(attr: AttributeReference, _) => - set += getQueryColumn(attr.name, - carbonTable, - tableName); + queryColumns += getQueryColumn(attr.name, + carbonTable); case Alias(attr: AggregateExpression, _) => if (attr.isDistinct) { return false } - val queryColumn = validateAggregateFunctionAndGetFields(carbonTable, - attr.aggregateFunction, - tableName) - if (queryColumn.nonEmpty) { - set ++= queryColumn + val aggExp = PreAggregateUtil.validateAggregateFunctionAndGetFields(attr) + if (aggExp.nonEmpty) { + aggreagteExps ++= aggExp } else { return false } @@ -1095,18 +953,16 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule expression.asInstanceOf[ScalaUDF].function.getClass.getName.equalsIgnoreCase( "org.apache.spark.sql.execution.command.timeseries.TimeseriesFunction") && CarbonUtil.hasTimeSeriesDataMap(carbonTable)) { - set += getQueryColumn(expression.asInstanceOf[ScalaUDF].children(0) + queryColumns += getQueryColumn(expression.asInstanceOf[ScalaUDF].children(0) .asInstanceOf[AttributeReference].name, carbonTable, - tableName, timeseriesFunction = expression.asInstanceOf[ScalaUDF].children(1).asInstanceOf[Literal] .value.toString) } else { expression.transform { case attr: AttributeReference => - set += getQueryColumn(attr.name, - carbonTable, - tableName) + queryColumns += getQueryColumn(attr.name, + carbonTable) attr } } @@ -1115,215 +971,112 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } /** - * Below method will be used to validate aggregate function and get the attribute information - * which is applied on select query. - * Currently sum, max, min, count, avg is supported - * in case of any other aggregate function it will return empty sequence - * In case of avg it will return two fields one for count - * and other of sum of that column to support rollup - * - * @param carbonTable - * parent table - * @param aggFunctions - * aggregation function - * @param tableName - * parent table name - * @return list of fields - */ - def validateAggregateFunctionAndGetFields(carbonTable: CarbonTable, - aggFunctions: AggregateFunction, - tableName: String - ): Seq[QueryColumn] = { - val changedDataType = true - aggFunctions match { - case sum@Sum(attr: AttributeReference) => - Seq(getQueryColumn(attr.name, - carbonTable, - tableName, - sum.prettyName)) - case sum@Sum(MatchCast(attr: AttributeReference, changeDataType: DataType)) => - Seq(getQueryColumn(attr.name, - carbonTable, - tableName, - sum.prettyName, - changeDataType.typeName, - changedDataType)) - case count@Count(Seq(attr: AttributeReference)) => - Seq(getQueryColumn(attr.name, - carbonTable, - tableName, - count.prettyName)) - case min@Min(attr: AttributeReference) => - Seq(getQueryColumn(attr.name, - carbonTable, - tableName, - min.prettyName)) - case min@Min(MatchCast(attr: AttributeReference, changeDataType: DataType)) => - Seq(getQueryColumn(attr.name, - carbonTable, - tableName, - min.prettyName, - changeDataType.typeName, - changedDataType)) - case max@Max(attr: AttributeReference) => - Seq(getQueryColumn(attr.name, - carbonTable, - tableName, - max.prettyName)) - case max@Max(MatchCast(attr: AttributeReference, changeDataType: DataType)) => - Seq(getQueryColumn(attr.name, - carbonTable, - tableName, - max.prettyName, - changeDataType.typeName, - changedDataType)) - // in case of average need to return two columns - // sum and count of the column to added during table creation to support rollup - case Average(attr: AttributeReference) => - Seq(getQueryColumn(attr.name, - carbonTable, - tableName, - "sum" - ), getQueryColumn(attr.name, - carbonTable, - tableName, - "count" - )) - // in case of average need to return two columns - // sum and count of the column to added during table creation to support rollup - case Average(MatchCast(attr: AttributeReference, changeDataType: DataType)) => - Seq(getQueryColumn(attr.name, - carbonTable, - tableName, - "sum", - changeDataType.typeName, - changedDataType), getQueryColumn(attr.name, - carbonTable, - tableName, - "count", - changeDataType.typeName, - changedDataType)) - case _ => - Seq.empty - } - } - - - - /** * Below method will be used to get the query column object which * will have details of the column and its property * - * @param columnName - * parent column name - * @param carbonTable - * parent carbon table - * @param tableName - * parent table name - * @param aggFunction - * aggregate function applied - * @param dataType - * data type of the column - * @param isChangedDataType - * is cast is applied on column - * @param isFilterColumn - * is filter is applied on column + * @param columnName parent column name + * @param carbonTable parent carbon table + * @param isFilterColumn is filter is applied on column * @return query column */ def getQueryColumn(columnName: String, carbonTable: CarbonTable, - tableName: String, - aggFunction: String = "", - dataType: String = "", - isChangedDataType: Boolean = false, isFilterColumn: Boolean = false, timeseriesFunction: String = ""): QueryColumn = { - val columnSchema = carbonTable.getColumnByName(tableName, columnName.toLowerCase) + val columnSchema = carbonTable.getColumnByName(carbonTable.getTableName, columnName.toLowerCase) if(null == columnSchema) { null } else { - if (isChangedDataType) { - new QueryColumn(columnSchema.getColumnSchema, - columnSchema.getDataType.getName, - aggFunction.toLowerCase, - isFilterColumn, - timeseriesFunction.toLowerCase) - } else { new QueryColumn(columnSchema.getColumnSchema, - CarbonScalaUtil.convertSparkToCarbonSchemaDataType(dataType), - aggFunction.toLowerCase, isFilterColumn, timeseriesFunction.toLowerCase) - } } } } -object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] { - +/** + * Data loading rule class to validate and update the data loading query plan + * Validation rule: + * 1. update the avg aggregate expression with two columns sum and count + * 2. Remove duplicate sum and count expression if already there in plan + * @param sparkSession spark session + */ +case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession) + extends Rule[LogicalPlan] { + lazy val parser = new CarbonSpark2SqlParser override def apply(plan: LogicalPlan): LogicalPlan = { - val validExpressionsMap = scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression] + val validExpressionsMap = scala.collection.mutable.HashSet.empty[AggExpToColumnMappingModel] + val namedExpressionList = scala.collection.mutable.LinkedHashSet.empty[NamedExpression] plan transform { - case aggregate@Aggregate(_, aExp, _) if validateAggregateExpressions(aExp) => + case aggregate@Aggregate(_, + aExp, + CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) + if validateAggregateExpressions(aExp) && + logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => aExp.foreach { - case alias: Alias => - validExpressionsMap ++= validateAggregateFunctionAndGetAlias(alias) - case _: UnresolvedAlias => - case namedExpr: NamedExpression => validExpressionsMap.put(namedExpr.name, namedExpr) + case attr: AttributeReference => + namedExpressionList += attr + case alias@Alias(_: AttributeReference, _) => + namedExpressionList += alias + case alias@Alias(aggExp: AggregateExpression, name) => + // get the updated expression for avg convert it to two expression + // sum and count + val expressions = PreAggregateUtil.validateAggregateFunctionAndGetFields(aggExp) + // if size is more than one then it was for average + if(expressions.size > 1) { + val sumExp = PreAggregateUtil.normalizeExprId( + expressions.head, + aggregate.allAttributes) + // get the logical plan fro count expression + val countExp = PreAggregateUtil.normalizeExprId( + expressions.last, + aggregate.allAttributes) + // check with same expression already sum is present then do not add to + // named expression list otherwise update the list and add it to set + if (!validExpressionsMap.contains(AggExpToColumnMappingModel(sumExp))) { + namedExpressionList += + Alias(expressions.head, name + "_ sum")(NamedExpression.newExprId, + alias.qualifier, + Some(alias.metadata), + alias.isGenerated) + validExpressionsMap += AggExpToColumnMappingModel(sumExp) + } + // check with same expression already count is present then do not add to + // named expression list otherwise update the list and add it to set + if (!validExpressionsMap.contains(AggExpToColumnMappingModel(countExp))) { + namedExpressionList += + Alias(expressions.last, name + "_ count")(NamedExpression.newExprId, + alias.qualifier, + Some(alias.metadata), + alias.isGenerated) + validExpressionsMap += AggExpToColumnMappingModel(countExp) + } + } else { + // get the logical plan for expression + val exp = PreAggregateUtil.normalizeExprId( + expressions.head, + aggregate.allAttributes) + // check with same expression already present then do not add to + // named expression list otherwise update the list and add it to set + if (!validExpressionsMap.contains(AggExpToColumnMappingModel(exp))) { + namedExpressionList+=alias + validExpressionsMap += AggExpToColumnMappingModel(exp) + } + } + case alias@Alias(_: Expression, _) => + namedExpressionList += alias } - aggregate.copy(aggregateExpressions = validExpressionsMap.values.toSeq) + aggregate.copy(aggregateExpressions = namedExpressionList.toSeq) case plan: LogicalPlan => plan } } - /** - * This method will split the avg column into sum and count and will return a sequence of tuple - * of unique name, alias - * - */ - private def validateAggregateFunctionAndGetAlias(alias: Alias): Seq[(String, - NamedExpression)] = { - alias match { - case udf@Alias(_: ScalaUDF, name) => - Seq((name, udf)) - case alias@Alias(attrExpression: AggregateExpression, _) => - attrExpression.aggregateFunction match { - case Sum(attr: AttributeReference) => - (attr.name + "_sum", alias) :: Nil - case Sum(MatchCastExpression(attr: AttributeReference, _)) => - (attr.name + "_sum", alias) :: Nil - case Count(Seq(attr: AttributeReference)) => - (attr.name + "_count", alias) :: Nil - case Count(Seq(MatchCastExpression(attr: AttributeReference, _))) => - (attr.name + "_count", alias) :: Nil - case Average(attr: AttributeReference) => - Seq((attr.name + "_sum", Alias(attrExpression. - copy(aggregateFunction = Sum(attr), - resultId = NamedExpression.newExprId), attr.name + "_sum")()), - (attr.name, Alias(attrExpression. - copy(aggregateFunction = Count(attr), - resultId = NamedExpression.newExprId), attr.name + "_count")())) - case Average(cast@MatchCastExpression(attr: AttributeReference, _)) => - Seq((attr.name + "_sum", Alias(attrExpression. - copy(aggregateFunction = Sum(cast), - resultId = NamedExpression.newExprId), - attr.name + "_sum")()), - (attr.name, Alias(attrExpression. - copy(aggregateFunction = Count(cast), resultId = - NamedExpression.newExprId), attr.name + "_count")())) - case _ => Seq(("", alias)) - } - - } - } - /** * Called by PreAggregateLoadingRules to validate if plan is valid for applying rules or not. * If the plan has PreAggLoad i.e Loading UDF and does not have PreAgg i.e Query UDF then it is * valid. - * - * @param namedExpression - * @return + * @param namedExpression named expressions + * @return valid or not */ private def validateAggregateExpressions(namedExpression: Seq[NamedExpression]): Boolean = { val filteredExpressions = namedExpression.filterNot(_.isInstanceOf[UnresolvedAlias])
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala index 3d84dd3..a1fa382 100644 --- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala @@ -262,7 +262,7 @@ class CarbonAnalyzer(catalog: SessionCatalog, analyzer: Analyzer) extends Analyzer(catalog, conf) { override def execute(plan: LogicalPlan): LogicalPlan = { var logicalPlan = analyzer.execute(plan) - logicalPlan = CarbonPreAggregateDataLoadingRules(logicalPlan) + logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan) CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/integration/spark2/src/main/spark2.2/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala index f563007..5046541 100644 --- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala @@ -190,7 +190,7 @@ class CarbonAnalyzer(catalog: SessionCatalog, analyzer: Analyzer) extends Analyzer(catalog, conf) { override def execute(plan: LogicalPlan): LogicalPlan = { var logicalPlan = analyzer.execute(plan) - logicalPlan = CarbonPreAggregateDataLoadingRules(logicalPlan) + logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan) CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan) } }
