Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1728#discussion_r159459370
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
---
@@ -98,238 +143,499 @@ case class
CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
needAnalysis = false
attr
}
+ if(needAnalysis) {
+ needAnalysis = isValidPlan(plan)
+ }
// if plan is not valid for transformation then return same plan
if (!needAnalysis) {
plan
} else {
- // create buffer to collect all the column and its metadata
information
- val list = scala.collection.mutable.HashSet.empty[QueryColumn]
- var isValidPlan = true
- val carbonTable = plan match {
- // matching the plan based on supported plan
- // if plan is matches with any case it will validate and get all
- // information required for transforming the plan
+ val updatedPlan = transformPreAggQueryPlan(plan)
+ val newPlan = updatePlan(updatedPlan)
+ print(newPlan.toString())
+ newPlan
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // subquery
- case Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
- // only carbon query plan is supported checking whether logical
relation is
- // is for carbon
- if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) =
getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
+ /**
+ * Below method will be used to update the child plan
+ * This will be used for updating expression like join condition,
+ * order by, project list etc
+ * @param plan
+ * child plan
+ * @return updated plan
+ */
+ def updatePlan(plan: LogicalPlan) : LogicalPlan = {
+ val updatedPlan = plan transform {
+ case Aggregate(grp, aggExp, child) =>
+ Aggregate(updateExpression(grp), updateNamedExpression(aggExp),
child)
+ case Filter(filterExp, child) =>
+ Filter(updateConditionExpression(Some(filterExp)).get, child)
+ case Project(projectList, child) =>
+ Project(updateNamedExpression(projectList), child)
+ case Sort(sortOrders, global, child) =>
+ Sort(updateSortExpression(sortOrders), global, child)
+ case Join(left, right, joinType, condition) =>
+ Join(left, right, joinType, updateConditionExpression(condition))
+ }
+ updatedPlan
+ }
- // below case for handling filter query
- // When plan has grouping expression, aggregate expression
- // filter expression
- case Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- // only carbon query plan is supported checking whether logical
relation is
- // is for carbon
- if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) =
getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan =
!CarbonReflectionUtils.hasPredicateSubquery(filterExp)
- }
- // getting the columns from filter expression
- if(isValidPlan) {
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list,
carbonTable, tableName)
+ /**
+ * Below method will be used to update the sort expression
+ * @param sortExp
+ * sort order expression in query
+ * @return updated sort expression
+ */
+ def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
+ sortExp map { order =>
+ order.child match {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId,
attr.qualifier, attr.isGenerated)
+ SortOrder(newExpression, order.direction)
+ } else {
+ SortOrder(attr, order.direction)
}
- carbonTable
+ }
+ }
+ }
- // When plan has grouping expression, aggregate expression
- // logical relation
- case Aggregate(groupingExp, aggregateExp, logicalRelation:
LogicalRelation)
- // only carbon query plan is supported checking whether logical
relation is
- // is for carbon
- if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) =
getCarbonTableAndTableName(logicalRelation)
- // if it is valid plan then extract the query columns
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- carbonTable
- // case for handling aggregation, order by
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
- if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) =
getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders,
carbonTable, tableName)
- }
- carbonTable
- // case for handling aggregation, order by and filter
- case Project(projectList,
- Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation:
LogicalRelation)))))
- if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) =
getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
- carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan =
!CarbonReflectionUtils.hasPredicateSubquery(filterExp)
+ /**
+ * Below method will be used to update the expression like group by
expression
+ * @param expressions
+ * sequence of expression like group by
+ * @return updated expressions
+ */
+ def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
+ val newExp = expressions map { expression =>
+ expression transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
+ if (childExp.isDefined) {
+ val newExpression = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId,
attr.qualifier, attr.isGenerated)
+ newExpression
+ } else {
+ attr
}
- if (isValidPlan) {
- list ++
- extractQueryColumnForOrderBy(Some(projectList), sortOrders,
carbonTable, tableName)
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list,
carbonTable, tableName)
+ }
+ }
+ newExp
+ }
+
+ /**
+ * Below method will be used to updated the named expression like
aggregate expression
+ * @param namedExpression
+ * any named expression like aggregate expression
+ * @return updated named expression
+ */
+ def updateNamedExpression(namedExpression: Seq[NamedExpression]) :
Seq[NamedExpression] = {
+ namedExpression map {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if(childExp.isDefined) {
+ val newExp = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId,
attr.qualifier, attr.isGenerated)
+ newExp
+ } else {
+ attr
+ }
+ case alias@Alias(exp, name) =>
+ val newExp = exp.transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if (childExp.isDefined) {
+ val newExp = AttributeReference(
+ childExp.get._2.name,
+ childExp.get._2.dataType,
+ childExp.get._2.nullable,
+ childExp.get._2.metadata)(childExp.get._2.exprId,
attr.qualifier, attr.isGenerated)
+ newExp
+ } else {
+ attr
+ }
+ }
+ Alias(newExp, name)(alias.exprId, alias.qualifier,
Some(alias.metadata), alias.isGenerated)
+ }
+ }
+
+ /**
+ * Below method will be used to updated condition expression
+ * @param conditionExp
+ * any condition expression join condition or filter condition
+ * @return updated condition expression
+ */
+ def updateConditionExpression(conditionExp: Option[Expression]):
Option[Expression] = {
+ if (conditionExp.isDefined) {
+ val filterExp = conditionExp.get
+ Some(filterExp.transform {
+ case attr: AttributeReference =>
+ val childExp = updatedExpression.find(p => p._1.sameRef(attr))
+ if(childExp.isDefined) {
+ childExp.get._2
+ } else {
+ attr
}
- carbonTable
- // case for handling aggregation with order by when only
projection column exits
- case Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
- if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) =
getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
+ })
+ } else {
+ conditionExp
+ }
+ }
+
+ /**
+ * Below method will be used to validate and transform the main table
plan to child table plan
+ * rules for transforming is as below.
+ * 1. Grouping expression rules
+ * 1.1 Change the parent attribute reference for of group expression
+ * to child attribute reference
+ *
+ * 2. Aggregate expression rules
+ * 2.1 Change the parent attribute reference for of group expression
to
+ * child attribute reference
+ * 2.2 Change the count AggregateExpression to Sum as count
+ * is already calculated so in case of aggregate table
+ * we need to apply sum to get the count
+ * 2.2 In case of average aggregate function select 2 columns from
aggregate table with
+ * aggregation sum and count. Then add divide(sum(column with sum),
sum(column with count)).
+ * Note: During aggregate table creation for average table will be
created with two columns
+ * one for sum(column) and count(column) to support rollup
+ * 3. Filter Expression rules.
+ * 3.1 Updated filter expression attributes with child table
attributes
+ * 4. Update the Parent Logical relation with child Logical relation
+ * 5. timeseries function
+ * 5.1 validate parent table has timeseries datamap
+ * 5.2 timeseries function is valid function or not
+ *
+ * @param logicalPlan
+ * parent logical plan
+ * @return transformed plan
+ */
+ def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = {
+ val updatedPlan = logicalPlan.transform {
+ // case for aggregation query
+ case agg@Aggregate(grExp,
+ aggExp,
+ child@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
+ if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
+ val carbonTable = getCarbonTable(logicalRelation)
+ val list = scala.collection.mutable.HashSet.empty[QueryColumn]
+ val aggregateExpressions =
scala.collection.mutable.HashSet.empty[AggregateExpression]
+ val isValidPlan = extractQueryColumnsFromAggExpression(
+ grExp,
+ aggExp,
+ carbonTable,
+ list,
+ aggregateExpressions)
+ if(isValidPlan) {
+ val (aggDataMapSchema, childPlan) =
getChildDataMapForTransformation(list,
+ aggregateExpressions,
carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders,
- carbonTable = carbonTable,
- tableName = tableName)
+ logicalRelation)
+ if(null != aggDataMapSchema && null!= childPlan) {
+ val attributes =
childPlan.output.asInstanceOf[Seq[AttributeReference]]
+ val (updatedGroupExp, updatedAggExp, newChild, None) =
+ getUpdatedExpressions(grExp,
+ aggExp,
+ child,
+ None,
+ aggDataMapSchema,
+ attributes,
+ childPlan,
+ carbonTable,
+ logicalRelation)
+ Aggregate(updatedGroupExp,
+ updatedAggExp,
+ newChild)
+ } else {
+ agg
}
- carbonTable
- // case for handling aggregation with order by and filter when
only projection column exits
- case Sort(sortOrders,
- _,
- Aggregate(groupingExp,
- aggregateExp,
- Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation:
LogicalRelation))))
- if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
- metaData.hasAggregateDataMapSchema =>
- val (carbonTable, tableName) =
getCarbonTableAndTableName(logicalRelation)
- isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
- aggregateExp,
+ } else {
+ agg
+ }
+ // case of handling aggregation query with filter
+ case agg@Aggregate(grExp,
+ aggExp,
+ Filter(expression, child@CarbonSubqueryAlias(_, logicalRelation:
LogicalRelation)))
+ if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+ metaData.hasAggregateDataMapSchema =>
+ val carbonTable = getCarbonTable(logicalRelation)
+ val list = scala.collection.mutable.HashSet.empty[QueryColumn]
+ val aggregateExpressions =
scala.collection.mutable.HashSet.empty[AggregateExpression]
+ var isValidPlan = extractQueryColumnsFromAggExpression(
+ grExp,
+ aggExp,
+ carbonTable,
+ list,
+ aggregateExpressions)
+ // getting the columns from filter expression
+ isValidPlan =
!CarbonReflectionUtils.hasPredicateSubquery(expression)
+ if (isValidPlan) {
+ isValidPlan = extractQueryColumnFromFilterExp(expression, list,
carbonTable)
+ }
+ if(isValidPlan) {
+ val (aggDataMapSchema, childPlan) =
getChildDataMapForTransformation(list,
+ aggregateExpressions,
carbonTable,
- tableName,
- list)
- if(isValidPlan) {
- isValidPlan =
!CarbonReflectionUtils.hasPredicateSubquery(filterExp)
- }
- if(isValidPlan) {
- list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders,
- carbonTable = carbonTable,
- tableName = tableName)
- isValidPlan = extractQueryColumnFromFilterExp(filterExp, list,
carbonTable, tableName)
- }
- carbonTable
- case _ =>
- isValidPlan = false
- null
- }
- if (isValidPlan && null != carbonTable) {
- isValidPlan = isSpecificSegmentPresent(carbonTable)
- }
- // if plan is valid then update the plan with child attributes
- if (isValidPlan) {
- // getting all the projection columns
- val listProjectionColumn = list
- .filter(queryColumn => queryColumn.getAggFunction.isEmpty &&
!queryColumn.isFilterColumn)
- .toList
- // getting all the filter columns
- val listFilterColumn = list
- .filter(queryColumn => queryColumn.getAggFunction.isEmpty &&
queryColumn.isFilterColumn)
- .toList
- // getting all the aggregation columns
- val listAggregationColumn = list.filter(queryColumn =>
!queryColumn.getAggFunction.isEmpty)
- .toList
- // create a query plan object which will be used to select the
list of pre aggregate tables
- // matches with this plan
- val queryPlan = new QueryPlan(listProjectionColumn.asJava,
- listAggregationColumn.asJava,
- listFilterColumn.asJava)
- // create aggregate table selector object
- val aggregateTableSelector = new AggregateTableSelector(queryPlan,
carbonTable)
- // select the list of valid child tables
- val selectedDataMapSchemas =
aggregateTableSelector.selectPreAggDataMapSchema()
- // if it does not match with any pre aggregate table return the
same plan
- if (!selectedDataMapSchemas.isEmpty) {
- // filter the selected child schema based on size to select the
pre-aggregate tables
- // that are nonEmpty
- val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val relationBuffer = selectedDataMapSchemas.asScala.map {
selectedDataMapSchema =>
- val identifier = TableIdentifier(
- selectedDataMapSchema.getRelationIdentifier.getTableName,
-
Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName))
- val carbonRelation =
-
catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
- val relation =
sparkSession.sessionState.catalog.lookupRelation(identifier)
- (selectedDataMapSchema, carbonRelation, relation)
- }.filter(_._2.sizeInBytes != 0L)
- if (relationBuffer.isEmpty) {
- // If the size of relation Buffer is 0 then it means that none
of the pre-aggregate
- // tables have date yet.
- // In this case we would return the original plan so that the
query hits the parent
- // table.
- plan
+ logicalRelation)
+ if(null != aggDataMapSchema && null!= childPlan) {
+ val attributes =
childPlan.output.asInstanceOf[Seq[AttributeReference]]
+ val (updatedGroupExp, updatedAggExp, newChild,
updatedFilterExpression) =
+ getUpdatedExpressions(grExp,
+ aggExp,
+ child,
+ Some(expression),
+ aggDataMapSchema,
+ attributes,
+ childPlan,
+ carbonTable,
+ logicalRelation)
+ Aggregate(updatedGroupExp,
+ updatedAggExp,
+ Filter(updatedFilterExpression.get,
+ newChild))
} else {
- // If the relationBuffer is nonEmpty then find the table with
the minimum size.
- val (aggDataMapSchema, _, relation) =
relationBuffer.minBy(_._2.sizeInBytes)
- val newRelation = new
FindDataSourceTable(sparkSession).apply(relation)
- // transform the query plan based on selected child schema
- transformPreAggQueryPlan(plan, aggDataMapSchema, newRelation)
+ agg
}
} else {
- plan
+ agg
}
+ }
+ updatedPlan
+ }
+
+ /**
+ * Below method will be used to validate query plan and get the proper
aggregation data map schema
+ * and child relation plan object if plan is valid for transformation
+ * @param queryColumns
+ * list of query columns from projection and filter
+ * @param aggregateExpressions
+ * list of aggregate expression (aggregate function)
+ * @param carbonTable
+ * parent carbon table
+ * @param logicalRelation
+ * parent logical relation
+ * @return if plan is valid then aggregation data map schema and its
relation plan
+ */
+ def getChildDataMapForTransformation(queryColumns:
scala.collection.mutable.HashSet[QueryColumn],
+ aggregateExpressions:
scala.collection.mutable.HashSet[AggregateExpression],
+ carbonTable: CarbonTable,
+ logicalRelation: LogicalRelation): (AggregationDataMapSchema,
LogicalPlan) = {
+ // getting all the projection columns
+ val listProjectionColumn = queryColumns
+ .filter(queryColumn => !queryColumn.isFilterColumn)
+ .toList
+ // getting all the filter columns
+ val listFilterColumn = queryColumns
+ .filter(queryColumn => queryColumn.isFilterColumn)
+ .toList
+ val isProjectionColumnPresent = (listProjectionColumn.size +
listFilterColumn.size) > 0
+ // create a query plan object which will be used to select the list of
pre aggregate tables
+ // matches with this plan
+ val queryPlan = new QueryPlan(listProjectionColumn.asJava,
listFilterColumn.asJava)
+ // create aggregate table selector object
+ val aggregateTableSelector = new AggregateTableSelector(queryPlan,
carbonTable)
+ // select the list of valid child tables
+ val selectedDataMapSchemas =
aggregateTableSelector.selectPreAggDataMapSchema()
+ // query has only aggregate expression then selected data map will be
empty
+ // the validate all the child data map otherwise validate selected
data map
+ var selectedAggMaps = if (isProjectionColumnPresent) {
+ selectedDataMapSchemas
+ } else {
+ carbonTable.getTableInfo.getDataMapSchemaList
+ }
+ val aggExpLogicalPlans = aggregateExpressions.map { queryAggExp =>
+ PreAggregateUtil.getLogicalPlanFromAggExp(queryAggExp,
+ carbonTable.getTableName,
+ carbonTable.getDatabaseName,
+ logicalRelation,
+ sparkSession,
+ parser)
+ }.toSeq
+ // if query does not have any aggregate function no need to validate
the same
+ if (aggregateExpressions.size > 0 && selectedAggMaps.size > 0) {
+ selectedAggMaps =
validateAggregateExpression(selectedAggMaps.asScala.toSeq,
+ carbonTable,
+ logicalRelation,
+ aggExpLogicalPlans).asJava
+ }
+ // if it does not match with any pre aggregate table return the same
plan
+ if (!selectedAggMaps.isEmpty) {
+ // filter the selected child schema based on size to select the
pre-aggregate tables
+ // that are nonEmpty
+ val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val relationBuffer = selectedAggMaps.asScala.map {
selectedDataMapSchema =>
+ val identifier = TableIdentifier(
+ selectedDataMapSchema.getRelationIdentifier.getTableName,
+
Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName))
+ val carbonRelation =
+
catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
+ val relation =
sparkSession.sessionState.catalog.lookupRelation(identifier)
+ (selectedDataMapSchema, carbonRelation, relation)
+ }.filter(_._2.sizeInBytes != 0L)
+ if (relationBuffer.isEmpty) {
+ // If the size of relation Buffer is 0 then it means that none of
the pre-aggregate
+ // tables have date yet.
+ // In this case we would return the original plan so that the
query hits the parent
+ // table.
+ (null, null)
} else {
- plan
+ // If the relationBuffer is nonEmpty then find the table with the
minimum size.
+ val (aggDataMapSchema, _, relation) =
relationBuffer.minBy(_._2.sizeInBytes)
+ val newRelation = new
FindDataSourceTable(sparkSession).apply(relation)
+ (aggDataMapSchema.asInstanceOf[AggregationDataMapSchema],
newRelation)
}
+ } else {
+ (null, null)
}
}
+ /**
+ * Below method will be used to validate aggregate expression with the
data map
+ * and will return the selected valid data maps
+ * @param selectedDataMap
+ * list of data maps
+ * @param carbonTable
+ * parent carbon table
+ * @param logicalRelation
+ * parent logical relation
+ * @param queryAggExpLogicalPlans
+ * query agg expression logical plan
+ * @return valid data map
+ */
+ def validateAggregateExpression(selectedDataMap: Seq[DataMapSchema],
+ carbonTable: CarbonTable,
+ logicalRelation: LogicalRelation,
+ queryAggExpLogicalPlans: Seq[LogicalPlan]): Seq[DataMapSchema] = {
+ def validateDataMap(dataMap: DataMapSchema,
+ aggExpLogicalPlans: Seq[LogicalPlan]): Boolean = {
+ val mappingModel = getLogicalPlanForAggregateExpression(dataMap,
+ carbonTable,
+ logicalRelation)
+ aggExpLogicalPlans.forall{
+ p => mappingModel.exists(m => p.sameResult(m.logicalPlan))
--- End diff --
move p => to up
---