Repository: carbondata Updated Branches: refs/heads/master 77217b370 -> f70e6d700
[CARBONDATA-1740][Pre-Aggregate] Fixed order by issue in case of preAggregate Problem: Order by query is failing in case of pre aggregate table. Solution: In pre aggregate rules order by scenario is not handled. Handling the same in this pr This closes #1544 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f70e6d70 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f70e6d70 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f70e6d70 Branch: refs/heads/master Commit: f70e6d7008cb5300581da4307ef55ec444999577 Parents: 77217b3 Author: kumarvishal <[email protected]> Authored: Mon Nov 20 16:36:11 2017 +0530 Committer: ravipesala <[email protected]> Committed: Tue Dec 5 08:23:48 2017 +0530 ---------------------------------------------------------------------- .../schema/table/AggregationDataMapSchema.java | 3 +- .../carbondata/core/preagg/QueryColumn.java | 24 ++ .../TestPreAggregateTableSelection.scala | 40 ++ .../sql/hive/CarbonPreAggregateRules.scala | 408 +++++++++++++++++-- .../src/main/spark2.1/CarbonSessionState.scala | 41 +- 5 files changed, 465 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/f70e6d70/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java index 87c07f4..9bfb22c 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java @@ -93,7 +93,8 @@ public class AggregationDataMapSchema extends DataMapSchema { for (ColumnSchema columnSchema : listOfColumns) { List<ParentColumnTableRelation> parentColumnTableRelations = columnSchema.getParentColumnTableRelations(); - if (parentColumnTableRelations.get(0).getColumnName().equals(columName)) { + if (null != parentColumnTableRelations && parentColumnTableRelations.size() == 1 + && parentColumnTableRelations.get(0).getColumnName().equals(columName)) { return columnSchema; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f70e6d70/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java index a62d556..c889716 100644 --- a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java +++ b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java @@ -67,4 +67,28 @@ public class QueryColumn { public boolean isFilterColumn() { return isFilterColumn; } + + @Override public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + QueryColumn that = (QueryColumn) o; + if (isFilterColumn != that.isFilterColumn) { + return false; + } + if (!columnSchema.equals(that.columnSchema)) { + return false; + } + return aggFunction != null ? aggFunction.equals(that.aggFunction) : that.aggFunction == null; + } + + @Override public int hashCode() { + int result = columnSchema.hashCode(); + result = 31 * result + (aggFunction != null ? aggFunction.hashCode() : 0); + result = 31 * result + (isFilterColumn ? 1 : 0); + return result; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f70e6d70/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala index c29beec..5dfe447 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala @@ -150,6 +150,46 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll { val df = sql("select L_RETURNFLAG,L_LINESTATUS,sum(L_QUANTITY),sum(L_EXTENDEDPRICE) from lineitem group by L_RETURNFLAG, L_LINESTATUS") preAggTableValidator(df.queryExecution.analyzed, "lineitem_agr_lineitem") } + test("test PreAggregate table selection 20") { + val df = sql("select name from mainTable group by name order by name") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0") + } + + test("test PreAggregate table selection 21") { + val df = sql("select name as NewName from mainTable group by name order by name") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0") + } + + test("test PreAggregate table selection 22") { + val df = sql("select name, sum(age) from mainTable group by name order by name") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1") + } + + test("test PreAggregate table selection 23") { + val df = sql("select name as NewName, sum(age) as sum from mainTable group by name order by name") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1") + } + + test("test PreAggregate table selection 24") { + val df = sql("select name as NewName, sum(age) as sum from mainTable where name='vishal' group by name order by name") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1") + } + + test("test PreAggregate table selection 25") { + val df = sql("select name as NewName, sum(age) as sum from mainTable where city = 'Bangalore' group by name order by name") + preAggTableValidator(df.queryExecution.analyzed, "maintable") + } + + test("test PreAggregate table selection 26") { + val df = sql("select name from mainTable where name='vishal' group by name order by name") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0") + } + + test("test PreAggregate table selection 27") { + val df = sql("select name as NewName from mainTable where name='vishal' group by name order by name") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0") + } + def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit ={ var isValidPlan = false http://git-wip-us.apache.org/repos/asf/carbondata/blob/f70e6d70/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala index 2875817..2b74ed7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala @@ -18,12 +18,12 @@ package org.apache.spark.sql.hive import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql._ import org.apache.spark.sql.CarbonExpressions.CarbonSubqueryAlias import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute} -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, NamedExpression, ScalaUDF} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, NamedExpression, ScalaUDF, SortOrder} import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CarbonException import org.apache.spark.sql.CarbonExpressions.MatchCast +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute} import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -64,6 +65,9 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil * 3. Filter Expression rules. * 3.1 Updated filter expression attributes with child table attributes * 4. Update the Parent Logical relation with child Logical relation + * 5. Order By Query rules. + * 5.1 Update project list based on updated aggregate expression + * 5.2 Update sort order attributes based on pre aggregate table * * @param sparkSession * spark session @@ -95,7 +99,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule plan } else { // create buffer to collect all the column and its metadata information - val list = scala.collection.mutable.ListBuffer.empty[QueryColumn] + val list = scala.collection.mutable.HashSet.empty[QueryColumn] var isValidPlan = true val carbonTable = plan match { // matching the plan based on supported plan @@ -105,8 +109,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule // When plan has grouping expression, aggregate expression // subquery case Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) + aggregateExp, + CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) // only carbon query plan is supported checking whether logical relation is // is for carbon if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && @@ -124,9 +128,10 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule // below case for handling filter query // When plan has grouping expression, aggregate expression // filter expression - case Aggregate(groupingExp, aggregateExp, - Filter(filterExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) + case Aggregate(groupingExp, + aggregateExp, + Filter(filterExp, + CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) // only carbon query plan is supported checking whether logical relation is // is for carbon if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && @@ -167,6 +172,104 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule tableName, list) carbonTable + // case for handling aggregation, order by + case Project(projectList, + Sort(sortOrders, + _, + Aggregate(groupingExp, + aggregateExp, + CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))) + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + .hasDataMapSchema => + val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) + isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, + aggregateExp, + carbonTable, + tableName, + list) + if(isValidPlan) { + list ++ + extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) + } + carbonTable + // case for handling aggregation, order by and filter + case Project(projectList, + Sort(sortOrders, + _, + Aggregate(groupingExp, + aggregateExp, + Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))) + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + .hasDataMapSchema => + val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) + isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, + aggregateExp, + carbonTable, + tableName, + list) + // TODO need to handle filter predicate subquery scenario +// isValidPlan = !PredicateSubquery.hasPredicateSubquery(filterExp) + if (isValidPlan) { + list ++ + extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) + filterExp.transform { + case attr: AttributeReference => + list += getQueryColumn(attr.name, carbonTable, tableName, isFilterColumn = true) + attr + } + } + carbonTable + // case for handling aggregation with order by when only projection column exits + case Sort(sortOrders, + _, + Aggregate(groupingExp, + aggregateExp, + CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + .hasDataMapSchema => + val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) + isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, + aggregateExp, + carbonTable, + tableName, + list) + if(isValidPlan) { + list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders, + carbonTable = carbonTable, + tableName = tableName) + } + carbonTable + // case for handling aggregation with order by and filter when only projection column exits + case Sort(sortOrders, + _, + Aggregate(groupingExp, + aggregateExp, + Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))) + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + .hasDataMapSchema => + val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) + isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, + aggregateExp, + carbonTable, + tableName, + list) + // TODO need to handle filter predicate subquery scenario +// isValidPlan = !PredicateSubquery.hasPredicateSubquery(filterExp) + if(isValidPlan) { + list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders, + carbonTable = carbonTable, + tableName = tableName) + filterExp.transform { + case attr: AttributeReference => + list += getQueryColumn(attr.name, carbonTable, tableName, isFilterColumn = true) + attr + } + } + carbonTable case _ => isValidPlan = false null @@ -176,11 +279,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule // getting all the projection columns val listProjectionColumn = list .filter(queryColumn => queryColumn.getAggFunction.isEmpty && !queryColumn.isFilterColumn) + .toList // getting all the filter columns val listFilterColumn = list .filter(queryColumn => queryColumn.getAggFunction.isEmpty && queryColumn.isFilterColumn) + .toList // getting all the aggregation columns val listAggregationColumn = list.filter(queryColumn => !queryColumn.getAggFunction.isEmpty) + .toList // create a query plan object which will be used to select the list of pre aggregate tables // matches with this plan val queryPlan = new QueryPlan(listProjectionColumn.asJava, @@ -190,7 +296,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable) // select the list of valid child tables val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema() - // if it doesnot match with any pre aggregate table return the same plan + // if it does not match with any pre aggregate table return the same plan if (!selectedDataMapSchemas.isEmpty) { // sort the selected child schema based on size to select smallest pre aggregate table val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore @@ -216,6 +322,48 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } /** + * Below method will be used to extract columns from order by expression + * @param projectList + * project list from plan + * @param sortOrders + * sort order in plan + * @param carbonTable + * carbon table + * @param tableName + * table name + * @return query columns from expression + */ + def extractQueryColumnForOrderBy(projectList: Option[Seq[NamedExpression]] = None, + sortOrders: Seq[SortOrder], + carbonTable: CarbonTable, + tableName: String): Seq[QueryColumn] = { + val list = scala.collection.mutable.ListBuffer.empty[QueryColumn] + if(projectList.isDefined) { + projectList.get.map { + proList => + proList.transform { + case attr: AttributeReference => + val queryColumn = getQueryColumn(attr.name, carbonTable, tableName) + if (null != queryColumn) { + list += queryColumn + } + attr + } + } + } + sortOrders.foreach { sortOrder => + sortOrder.child match { + case attr: AttributeReference => + val queryColumn = getQueryColumn(attr.name, carbonTable, tableName) + if (null != queryColumn) { + list += queryColumn + } + } + } + list + } + + /** * Below method will be used to get the child attribute reference * based on parent name * @@ -227,12 +375,16 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule * child logical relation * @param aggFunction * aggregation function applied on child + * @param canBeNull + * this is added for strict validation in which case child attribute can be + * null and when it cannot be null * @return child attribute reference */ def getChildAttributeReference(dataMapSchema: DataMapSchema, attributeReference: AttributeReference, attributes: Seq[AttributeReference], - aggFunction: String = ""): AttributeReference = { + aggFunction: String = "", + canBeNull: Boolean = false): AttributeReference = { val aggregationDataMapSchema = dataMapSchema.asInstanceOf[AggregationDataMapSchema]; val columnSchema = if (aggFunction.isEmpty) { aggregationDataMapSchema.getChildColByParentColName(attributeReference.name.toLowerCase) @@ -242,11 +394,15 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } // here column schema cannot be null, if it is null then aggregate table selection // logic has some problem - if (null == columnSchema) { + if (!canBeNull && null == columnSchema) { throw new AnalysisException("Column does not exists in Pre Aggregate table") } - // finding the child attribute from child logical relation - attributes.find(p => p.name.equals(columnSchema.getColumnName)).get + if(null == columnSchema && canBeNull) { + null + } else { + // finding the child attribute from child logical relation + attributes.find(p => p.name.equals(columnSchema.getColumnName)).get + } } /** @@ -269,6 +425,9 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule * 3. Filter Expression rules. * 3.1 Updated filter expression attributes with child table attributes * 4. Update the Parent Logical relation with child Logical relation + * 5. Order by plan rules. + * 5.1 Update project list based on updated aggregate expression + * 5.2 Update sort order attributes based on pre aggregate table * * @param logicalPlan * parent logical plan @@ -283,6 +442,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule childPlan: LogicalPlan): LogicalPlan = { val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]] logicalPlan.transform { + // case for aggregation query case Aggregate(grExp, aggExp, child@CarbonSubqueryAlias(_, l: LogicalRelation)) if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema => @@ -297,9 +457,10 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule Aggregate(updatedGroupExp, updatedAggExp, newChild) + // case of handling aggregation query with filter case Aggregate(grExp, - aggExp, - Filter(expression, child@CarbonSubqueryAlias(_, l: LogicalRelation))) + aggExp, + Filter(expression, child@CarbonSubqueryAlias(_, l: LogicalRelation))) if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema => val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = @@ -314,6 +475,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule updatedAggExp, Filter(updatedFilterExpression.get, newChild)) + // case for aggregation query case Aggregate(grExp, aggExp, l: LogicalRelation) if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema => @@ -328,10 +490,156 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule Aggregate(updatedGroupExp, updatedAggExp, newChild) + // case for aggregation query with order by + case Project(_, + Sort(sortOrders, + global, + Aggregate(groupingExp, + aggregateExp, + subQuery@CarbonSubqueryAlias(_, l: LogicalRelation)))) + if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema => + val (updatedGroupExp, updatedAggExp, newChild, None) = + getUpdatedExpressions(groupingExp, + aggregateExp, + subQuery, + None, + aggDataMapSchema, + attributes, + childPlan) + val (updatedProjectList, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp, + sortOrders, + aggDataMapSchema, + attributes) + Project(updatedProjectList, + Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, newChild))) + // case for handling aggregation query with filter and order by + case Project(_, + Sort(sortOrders, + global, + Aggregate(groupingExp, + aggregateExp, + Filter(expression, subQuery@CarbonSubqueryAlias(_, l: LogicalRelation))))) + if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema => + val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = + getUpdatedExpressions(groupingExp, + aggregateExp, + subQuery, + Some(expression), + aggDataMapSchema, + attributes, + childPlan) + val (updatedProjectList, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp, + sortOrders, + aggDataMapSchema, + attributes) + Project(updatedProjectList, + Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, + Filter(updatedFilterExpression.get, newChild)))) + // case for handling aggregation with order by when only projection column exits + case Sort(sortOrders, + global, + Aggregate( + groupingExp, + aggregateExp, + subQuery@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + .hasDataMapSchema => + val (updatedGroupExp, updatedAggExp, newChild, None) = + getUpdatedExpressions(groupingExp, + aggregateExp, + subQuery, + None, + aggDataMapSchema, + attributes, + childPlan) + val (_, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp, + sortOrders, + aggDataMapSchema, + attributes) + Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, newChild)) + // case for handling aggregation with order by and filter when only projection column exits + case Sort(sortOrders, + global, + Aggregate(groupingExp, + aggregateExp, + Filter(expression, subQuery@CarbonSubqueryAlias(_, l: LogicalRelation)))) + if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema => + val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = + getUpdatedExpressions(groupingExp, + aggregateExp, + subQuery, + Some(expression), + aggDataMapSchema, + attributes, + childPlan) + val (_, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp, + sortOrders, + aggDataMapSchema, + attributes) + Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, newChild)) } } /** + * Below method will be used to updated the maintable plan for order by query + * In case of order by we need to update project list and sort order attributes. + * + * @param aggregateExp + * child table aggregate expression + * @param sortOrders + * sort order expression in maintable plan + * @param aggDataMapSchema + * child data map schema + * @param attributes + * child attributes + * @return updated project list and updated sort order + */ + def transformPlanForOrderBy(aggregateExp: Seq[NamedExpression], + sortOrders: Seq[SortOrder], aggDataMapSchema: DataMapSchema, + attributes: Seq[AttributeReference]): (Seq[NamedExpression], Seq[SortOrder]) = { + val updatedProjectList = new ArrayBuffer[NamedExpression]() + // getting the updated project list from aggregate expression + aggregateExp.foreach{f => f.transform { + // for projection column + case alias@Alias(attr: AttributeReference, name) => + updatedProjectList += AttributeReference(name, attr.dataType, attr.nullable)(alias.exprId, + alias.qualifier, + alias.isGenerated) + alias + // for aggregaton column + case alias@Alias(attr: AggregateExpression, name) => + updatedProjectList += AttributeReference(name, attr.dataType, attr.nullable)(alias.exprId, + alias.qualifier, + alias.isGenerated) + alias + } + } + // getting the updated sort order + val updatedSortOrders = sortOrders.map { order => + order.child match { + case attr: AttributeReference => + val childAttribute = getChildAttributeReference(aggDataMapSchema, + attr, + attributes, + canBeNull = true) + // child attribute can be null only in case of alias in query + // so in that case we need to update the sortorder based on new alias + if (null != childAttribute) { + val childExpression = getUpdatedSortOrderExpression(childAttribute, aggregateExp) + SortOrder(childExpression, order.direction) + } else { + val childExpression = getUpdatedSortOrderExpression(attr, aggregateExp) + SortOrder(childExpression, order.direction) + } + } + } + (updatedProjectList, updatedSortOrders) + } + /** * Below method will be used to get the updated expression for pre aggregated table. * It will replace the attribute of actual plan with child table attributes. * Updation will be done for below expression. @@ -352,6 +660,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule * pre aggregate table schema * @param attributes * pre aggregate table logical relation + * @param aggPlan + * aggregate logical plan * @return tuple of(updated grouping expression, * updated aggregate expression, * updated child logical plan, @@ -363,7 +673,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule aggDataMapSchema: DataMapSchema, attributes: Seq[AttributeReference], aggPlan: LogicalPlan): (Seq[Expression], Seq[NamedExpression], LogicalPlan, - Option[Expression]) = { + Option[Expression]) = { // transforming the group by expression attributes with child attributes val updatedGroupExp = groupingExpressions.map { exp => exp.transform { @@ -424,7 +734,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule aggPlan match { case s: SubqueryAlias => s.child case others => others - } + } } // updating the filter expression if present val updatedFilterExpression = if (filterExpression.isDefined) { @@ -440,6 +750,32 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } /** + * Below method will be used to get the updated sort order attribute + * based on pre aggregate table + * @param sortOrderAttr + * sort order attributes reference + * @param aggregateExpressions + * aggregate expression + * @return updated sortorder attribute + */ + def getUpdatedSortOrderExpression(sortOrderAttr: AttributeReference, + aggregateExpressions: Seq[NamedExpression]): Expression = { + val updatedExpression = aggregateExpressions collectFirst { + // in case of alias we need to match with alias name and when alias is not present + // we need to compare with attribute reference name + case alias@Alias(attr: AttributeReference, name) + if attr.name.equals(sortOrderAttr.name) || name.equals(sortOrderAttr.name) => + AttributeReference(name, + attr.dataType, + attr.nullable, + attr.metadata)(alias.exprId, alias.qualifier, alias.isGenerated) + } + // any case it will match the condition, so no need to check whether updated expression is empty + // or not + updatedExpression.get + } + + /** * Below method will be used to get the aggregate expression based on match * Aggregate expression updation rules * 1 Change the count AggregateExpression to Sum as count @@ -532,11 +868,10 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule "sum")), aggExp.mode, isDistinct = false), - AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema, + AggregateExpression(Sum(getChildAttributeReference(dataMapSchema, attr, attributes, - "count"), - LongType)), + "count")), aggExp.mode, isDistinct = false)) // In case of average aggregate function select 2 columns from aggregate table @@ -547,14 +882,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule attr, attributes, "sum"), - changeDataType)), + DoubleType)), aggExp.mode, isDistinct = false), AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema, attr, attributes, "count"), - LongType)), + DoubleType)), aggExp.mode, isDistinct = false)) } @@ -587,21 +922,21 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule * parent carbon table * @param tableName * parent table name - * @param list + * @param set * list of attributes * @return plan is valid */ def extractQueryColumnsFromAggExpression(groupByExpression: Seq[Expression], aggregateExpressions: Seq[NamedExpression], carbonTable: CarbonTable, tableName: String, - list: scala.collection.mutable.ListBuffer[QueryColumn]): Boolean = { + set: scala.collection.mutable.HashSet[QueryColumn]): Boolean = { aggregateExpressions.map { case attr: AttributeReference => - list += getQueryColumn(attr.name, + set += getQueryColumn(attr.name, carbonTable, tableName); case Alias(attr: AttributeReference, _) => - list += getQueryColumn(attr.name, + set += getQueryColumn(attr.name, carbonTable, tableName); case Alias(attr: AggregateExpression, _) => @@ -612,7 +947,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule attr.aggregateFunction, tableName) if (queryColumn.nonEmpty) { - list ++= queryColumn + set ++= queryColumn } else { return false } @@ -741,15 +1076,20 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule dataType: String = "", isChangedDataType: Boolean = false, isFilterColumn: Boolean = false): QueryColumn = { - val columnSchema = carbonTable.getColumnByName(tableName, - columnName.toLowerCase).getColumnSchema - if (isChangedDataType) { - new QueryColumn(columnSchema, columnSchema.getDataType.getName, - aggFunction.toLowerCase, isFilterColumn) + val columnSchema = carbonTable.getColumnByName(tableName, columnName.toLowerCase) + if(null == columnSchema) { + null } else { - new QueryColumn(columnSchema, + if (isChangedDataType) { + new QueryColumn(columnSchema.getColumnSchema, + columnSchema.getDataType.getName, + aggFunction.toLowerCase, + isFilterColumn) + } else { + new QueryColumn(columnSchema.getColumnSchema, CarbonScalaUtil.convertSparkToCarbonSchemaDataType(dataType), aggFunction.toLowerCase, isFilterColumn) + } } } } @@ -867,7 +1207,7 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan] } else if (version.startsWith("2.2")) { CarbonReflectionUtils.getField("query", p).asInstanceOf[LogicalPlan] - } else { + } else { throw new UnsupportedOperationException(s"Spark version $version is not supported") } } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/f70e6d70/integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala index 911c25d..c54f3fe 100644 --- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala @@ -20,7 +20,7 @@ import java.lang.reflect.Constructor import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession} -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{PredicateSubquery, ScalarSubquery} @@ -155,7 +155,6 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp catalog.ParquetConversions :: catalog.OrcConversions :: CarbonPreInsertionCasts(sparkSession) :: - CarbonPreAggregateQueryRules(sparkSession) :: CarbonPreAggregateDataLoadingRules :: CarbonIUDAnalysisRule(sparkSession) :: AnalyzeCreateTable(sparkSession) :: @@ -163,22 +162,22 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp DataSourceAnalysis(conf) :: (if (conf.runSQLonFile) { new ResolveDataSource(sparkSession) :: Nil - } else { Nil } - ) + } else { Nil }) } - override lazy val analyzer: Analyzer = { - new Analyzer(catalog, conf) { - override val extendedResolutionRules = - if (extendedAnalyzerRules.nonEmpty) { - extendedAnalyzerRules ++ internalAnalyzerRules - } else { - internalAnalyzerRules - } - override val extendedCheckRules = Seq( - PreWriteCheck(conf, catalog)) - } - } + override lazy val analyzer: Analyzer = + new CarbonAnalyzer(catalog, conf, sparkSession, + new Analyzer(catalog, conf) { + override val extendedResolutionRules = + if (extendedAnalyzerRules.nonEmpty) { + extendedAnalyzerRules ++ internalAnalyzerRules + } else { + internalAnalyzerRules + } + override val extendedCheckRules = Seq( + PreWriteCheck(conf, catalog)) + } + ) /** * Internal catalog for managing table and database states. @@ -195,6 +194,16 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp } } +class CarbonAnalyzer(catalog: SessionCatalog, + conf: CatalystConf, + sparkSession: SparkSession, + analyzer: Analyzer) extends Analyzer(catalog, conf) { + override def execute(plan: LogicalPlan): LogicalPlan = { + val logicalPlan = analyzer.execute(plan) + CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan) + } +} + class CarbonOptimizer( catalog: SessionCatalog, conf: SQLConf,
