Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159053493
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
---
@@ -330,6 +207,264 @@ case class CarbonPreAggregateQueryRules(sparkSession:
SparkSession) extends Rule
}
}
+ /**
+ * Below method will be used to validate the logical plan
+ * and get all the details from to select proper aggregate table
+ * @param logicalPlan
+ * actual query logical plan
+ * @param list
+ * list of projection column present in plan
+ * @param qAggExprs
+ * list of aggregate expression
+ * @return if plan is valid for tranformation, parent table, parent
logical relaion
+ */
+ def validatePlanAndGetFields(logicalPlan: LogicalPlan,
+ list: scala.collection.mutable.HashSet[QueryColumn],
+ qAggExprs: scala.collection.mutable.HashSet[AggregateExpression]):
(Boolean,
+ CarbonTable, LogicalRelation) = {
+ var isValidPlan = false
+ var pTable: CarbonTable = null
+ var qLRelation: LogicalRelation = null
+ logicalPlan.transform {
+ // to handle filter expression
+ case filter@Filter(filterExp,
+ CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
+ // only carbon query plan is supported checking whether logical
relation is
+ // is for carbon
+ if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
+ qLRelation = logicalRelation
+ pTable = getCarbonTableAndTableName(logicalRelation)
+ // getting the columns from filter expression
+ if (!CarbonReflectionUtils.hasPredicateSubquery(filterExp)) {
+ isValidPlan = extractQueryColumnFromFilterExp(filterExp, list,
pTable)
+ }
+ filter
+ // to handle aggregate expression
+ case agg@Aggregate(groupingExp,
+ aggregateExp,
+ CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
+ // only carbon query plan is supported checking whether logical
relation is
+ // is for carbon
+ if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
+ qLRelation = logicalRelation
+ pTable = getCarbonTableAndTableName(logicalRelation)
+ isValidPlan = extractQueryColumnsFromAggExpression(
+ groupingExp,
+ aggregateExp,
+ pTable,
+ list,
+ qAggExprs)
+ agg
+ // to handle aggregate expression with filter
+ case agg@Aggregate(grExp, aggExp, filter@Filter(_, _)) =>
+ val out = validatePlanAndGetFields(filter, list, qAggExprs)
+ pTable = out._2
+ qLRelation = out._3
+ isValidPlan = out._1
+ if (isValidPlan) {
+ isValidPlan = extractQueryColumnsFromAggExpression(grExp,
aggExp, pTable, list, qAggExprs)
+ }
+ agg
+ // to handle projection with order by
+ case proj@Project(projectList, sort@Sort(_, _, _)) =>
+ val out = validatePlanAndGetFields(sort, list, qAggExprs)
+ pTable = out._2
+ qLRelation = out._3
+ isValidPlan = out._1
+ if(isValidPlan) {
+ list ++ extractQueryColumnForOrderBy(Some(projectList),
Seq.empty, pTable)
+ }
+ proj
+ // to handle only projection
+ case proj@Project(projectList, agg@Aggregate(_, _, _)) =>
+ val out = validatePlanAndGetFields(agg, list, qAggExprs)
+ pTable = out._2
+ qLRelation = out._3
+ isValidPlan = out._1
+ if(isValidPlan) {
+ list ++ extractQueryColumnForOrderBy(Some(projectList),
Seq.empty, pTable)
+ }
+ proj
+ // case for handling aggregation with order by when only projection
column exits
+ case sort@Sort(sortOrders, _, agg@Aggregate(_, _, _)) =>
+ val out = validatePlanAndGetFields(agg, list, qAggExprs)
+ pTable = out._2
+ qLRelation = out._3
+ isValidPlan = out._1
+ if(isValidPlan) {
+ list ++
+ extractQueryColumnForOrderBy(None, sortOrders, pTable)
+ }
+ sort
+ }
+ (isValidPlan, pTable, qLRelation)
+ }
+
+ /**
+ * Below method will be used to validate aggregate expression with the
data map
+ * and will return the selected valid data maps
+ * @param selectedDataMap
+ * list of data maps
+ * @param carbonTable
+ * parent carbon table
+ * @param logicalRelation
+ * parent logical relation
+ * @param queryAggExpLogicalPlans
+ * query agg expression logical plan
+ * @return valid data map
+ */
+ def validateAggregateExpression(selectedDataMap: Seq[DataMapSchema],
+ carbonTable: CarbonTable,
+ logicalRelation: LogicalRelation,
+ queryAggExpLogicalPlans: Seq[LogicalPlan]): Seq[DataMapSchema] = {
+ def validateDataMap(dataMap: DataMapSchema,
+ aggExpLogicalPlans: Seq[LogicalPlan]): Boolean = {
+ val schemaAggLogicalPlan =
getLogicalPlanForAggregateExpression(dataMap,
+ carbonTable,
+ logicalRelation)
+ aggExpLogicalPlans.forall{
+ p => schemaAggLogicalPlan.exists(m => p.sameResult(m._1))
+ }
+ }
+ val selectedDataMapSchema = selectedDataMap.collect {
+ case dataMap if validateDataMap(dataMap, queryAggExpLogicalPlans) =>
+ dataMap
+ }
+ selectedDataMapSchema
+ }
+
+ /**
+ * Below method will be used to update the logical plan of expression
+ * with parent table logical relation
+ * @param logicalPlan
+ * @param logicalRelation
+ * @return
+ */
+ def updateLogicalRelation(logicalPlan: LogicalPlan,
+ logicalRelation: LogicalRelation): LogicalPlan = {
+ logicalPlan transform {
+ case l: LogicalRelation =>
+ l.copy(relation = logicalRelation.relation)
+ }
+ }
+
+ /**
+ * Below method will be used to to get the logical plan for each
aggregate expression in
+ * child data map and its column schema mapping if mapping is already
present
+ * then it will use the same otherwise it will generate and stored in
aggregation data map
+ * @param selectedDataMap
+ * child data map
+ * @param carbonTable
+ * parent table
+ * @param logicalRelation
+ * logical relation of actual plan
+ * @return map of logical plan for each aggregate expression in child
query and its column mapping
+ */
+ def getLogicalPlanForAggregateExpression(selectedDataMap: DataMapSchema,
carbonTable: CarbonTable,
+ logicalRelation: LogicalRelation): Map[LogicalPlan, ColumnSchema] = {
+ val aggDataMapSchema =
selectedDataMap.asInstanceOf[AggregationDataMapSchema]
+ // if column mapping is not present
+ if (null == aggDataMapSchema.getAggregateExpressionToColumnMapping) {
+ // add preAGG UDF to avoid all the PreAggregate rule
+ val childDataMapQueryString = new CarbonSpark2SqlParser()
+
.addPreAggFunction(aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY"))
+ // get the logical plan
+ val aggPlan = sparkSession.sql(childDataMapQueryString).logicalPlan
+ // getting all aggregate expression from query
+ val dataMapAggExp = getAggregateExpFromChildDataMap(aggPlan)
+ // in case of average child table will have two columns which will
be stored in sequence
+ // so for average expression we need to get two columns for mapping
+ var counter = 0
+ // sorting the columns based on schema ordinal so search will give
proper result
+ val sortedColumnList =
aggDataMapSchema.getChildSchema.getListOfColumns.asScala
+ .sortBy(_.getSchemaOrdinal)
+ val logicalPlanToColumnMapping = dataMapAggExp.map { aggExp =>
+ // for each aggregate expression get logical plan
+ val expLogicalPlan = getLogicalPlanFromAggExp(aggExp,
+ carbonTable.getTableName,
+ carbonTable.getDatabaseName, logicalRelation)
+ // check if aggregate expression is of type avg
+ // get the columns
+ var columnSchema = aggDataMapSchema
+ .getAggColumnBasedOnIndex(counter, sortedColumnList.asJava)
+ // increment the counter so when for next expression above code
will be
+ // executed it will search from that schema ordinal
+ counter = columnSchema.getSchemaOrdinal + 1
+ (expLogicalPlan, columnSchema)
+ }.toMap
+ // store the mapping in data map
+
aggDataMapSchema.setAggregateExpressionToColumnMapping(logicalPlanToColumnMapping.asJava)
+ // return the mapping
+ logicalPlanToColumnMapping
+ } else {
+ // if already present in data map then return the same
+ aggDataMapSchema.getAggregateExpressionToColumnMapping
+ .asInstanceOf[java.util.Map[LogicalPlan,
ColumnSchema]].asScala.toMap
+ }
+ }
+
+
+ /**
+ * Below method will be used to get the logical plan from aggregate
expression
+ * @param aggExp
+ * aggregate expression
+ * @param tableName
+ * parent table name
+ * @param databaseName
+ * database name
+ * @param logicalRelation
+ * logical relation
+ * @return logical plan
+ */
+ def getLogicalPlanFromAggExp(aggExp: AggregateExpression,
+ tableName: String,
+ databaseName: String,
+ logicalRelation: LogicalRelation): LogicalPlan = {
+ // adding the preAGG UDF, so pre aggregate data loading rule and query
rule will not
+ // be applied
+ val query = new CarbonSpark2SqlParser()
--- End diff --
Don't create parser every time, pass from caller
---