Indhumathi27 commented on code in PR #4257: URL: https://github.com/apache/carbondata/pull/4257#discussion_r849499860
########## integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala: ########## @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.{CarbonParserUtil, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation} import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Coalesce, Expression, Literal, ScalaUDF} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average} -import org.apache.spark.sql.catalyst.plans.logical.{Join, Limit, LogicalPlan, Sort} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, Limit, LogicalPlan, Sort} Review Comment: looks like unused import ########## integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala: ########## @@ -565,9 +589,7 @@ case class CarbonCreateMVCommand( logicalPlan.transformAllExpressions { case alias: Alias => alias case aggregate: AggregateExpression => - // If average function present then go for full refresh Review Comment: i think, now this CASE becomes invalid. you can remove it ########## integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala: ########## @@ -800,7 +826,65 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan, val planWrapper = groupBy.modularPlan.get.asInstanceOf[MVPlanWrapper] val plan = planWrapper.modularPlan.asInstanceOf[Select] val updatedPlanOutputList = getUpdatedOutputList(plan.outputList, groupBy.modularPlan) - val outputListMapping = groupBy.outputList zip updatedPlanOutputList + // columnIndex is used to iterate over updatedPlanOutputList. For each avg attribute, + // updatedPlanOutputList has 2 attributes (sum and count) and + // by maintaining index we can increment and access when needed. + var columnIndex = -1 + + def getColumnName(expression: Expression): String = { + if (expression.isInstanceOf[AttributeReference]) { Review Comment: can be replaced with CASE pattern match ########## integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala: ########## @@ -28,20 +28,22 @@ import scala.util.control.Breaks.{break, breakable} import org.apache.log4j.Logger import org.apache.spark.sql.{CarbonToSparkAdapter, SparkSession} import org.apache.spark.sql.catalyst.catalog.HiveTableRelation -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, Expression, Literal, NamedExpression, ScalaUDF, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, Cast, Divide, Expression, Literal, NamedExpression, ScalaUDF, SortOrder} import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan} import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode} import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.types.{DataType, DataTypes} +import org.apache.spark.sql.parser.MVQueryParser Review Comment: remove unused import ########## integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala: ########## @@ -800,7 +826,65 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan, val planWrapper = groupBy.modularPlan.get.asInstanceOf[MVPlanWrapper] val plan = planWrapper.modularPlan.asInstanceOf[Select] val updatedPlanOutputList = getUpdatedOutputList(plan.outputList, groupBy.modularPlan) - val outputListMapping = groupBy.outputList zip updatedPlanOutputList + // columnIndex is used to iterate over updatedPlanOutputList. For each avg attribute, + // updatedPlanOutputList has 2 attributes (sum and count) and + // by maintaining index we can increment and access when needed. + var columnIndex = -1 + + def getColumnName(expression: Expression): String = { + if (expression.isInstanceOf[AttributeReference]) { + expression.asInstanceOf[AttributeReference].name + } else if (expression.isInstanceOf[Literal]) { + expression.asInstanceOf[Literal].value.toString + } else { + "" + } + } + // get column from list having the given aggregate and column name. + def getColumnFromOutputList(updatedPlanOutputList: Seq[NamedExpression], aggregate: String, + colName: String): NamedExpression = { + val nextIndex = columnIndex + 1 + if ((nextIndex) < updatedPlanOutputList.size && + updatedPlanOutputList(nextIndex).name.contains(aggregate) && + updatedPlanOutputList(nextIndex).name.contains(colName)) { + columnIndex += 1 + updatedPlanOutputList(columnIndex) + } else { + updatedPlanOutputList.find(x => x.name.contains(aggregate) && + x.name.contains(colName)).get + } + } + val outputListMapping = if (groupBy.outputList.exists(_.sql.contains("avg("))) { + // for each avg attribute, updatedPlanOutputList has 2 attributes (sum and count), + // so direct mapping of groupBy.outputList and updatedPlanOutputList is not possible. + // If query has avg, then get the sum, count attributes in the list and map accordingly. + for (exp <- groupBy.outputList) yield { + exp match { + case Alias(aggregateExpression: AggregateExpression, _) + if aggregateExpression.aggregateFunction.isInstanceOf[Average] => + val colName = getColumnName(aggregateExpression.collectLeaves().head) + val sumAttr = getColumnFromOutputList(updatedPlanOutputList, "sum", colName) Review Comment: can use constants already defined in CarbonCommonConstants ########## integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala: ########## @@ -800,7 +826,65 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan, val planWrapper = groupBy.modularPlan.get.asInstanceOf[MVPlanWrapper] val plan = planWrapper.modularPlan.asInstanceOf[Select] val updatedPlanOutputList = getUpdatedOutputList(plan.outputList, groupBy.modularPlan) - val outputListMapping = groupBy.outputList zip updatedPlanOutputList + // columnIndex is used to iterate over updatedPlanOutputList. For each avg attribute, + // updatedPlanOutputList has 2 attributes (sum and count) and + // by maintaining index we can increment and access when needed. + var columnIndex = -1 + + def getColumnName(expression: Expression): String = { + if (expression.isInstanceOf[AttributeReference]) { + expression.asInstanceOf[AttributeReference].name + } else if (expression.isInstanceOf[Literal]) { + expression.asInstanceOf[Literal].value.toString + } else { + "" + } + } + // get column from list having the given aggregate and column name. + def getColumnFromOutputList(updatedPlanOutputList: Seq[NamedExpression], aggregate: String, + colName: String): NamedExpression = { + val nextIndex = columnIndex + 1 + if ((nextIndex) < updatedPlanOutputList.size && + updatedPlanOutputList(nextIndex).name.contains(aggregate) && + updatedPlanOutputList(nextIndex).name.contains(colName)) { + columnIndex += 1 + updatedPlanOutputList(columnIndex) + } else { + updatedPlanOutputList.find(x => x.name.contains(aggregate) && + x.name.contains(colName)).get + } + } + val outputListMapping = if (groupBy.outputList.exists(_.sql.contains("avg("))) { Review Comment: can use constants already defined in CarbonCommonConstants -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@carbondata.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org