[CARBONDATA-1925][Pre-Aggregate]Added code to support case expression Added code to support expression inside aggregation function for pre-aggregate table
This closes #1694 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/adb8c135 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/adb8c135 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/adb8c135 Branch: refs/heads/branch-1.3 Commit: adb8c1356d0b753ecefe132cf5193ea3f2f92dea Parents: c5e72a4 Author: kumarvishal <[email protected]> Authored: Wed Dec 20 15:46:02 2017 +0530 Committer: Jacky Li <[email protected]> Committed: Fri Dec 29 22:39:44 2017 +0800 ---------------------------------------------------------------------- .../schema/table/AggregationDataMapSchema.java | 4 + .../TestPreAggregateExpressions.scala | 102 ++++++ .../command/carbonTableSchemaCommon.scala | 50 ++- .../apache/spark/sql/CarbonExpressions.scala | 13 + .../preaaggregate/PreAggregateUtil.scala | 313 ++++++++++++------- .../command/timeseries/TimeseriesUtil.scala | 8 +- .../sql/hive/CarbonPreAggregateRules.scala | 8 +- 7 files changed, 364 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/adb8c135/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java index 8f6a2d3..4b2d492 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java @@ -329,6 +329,10 @@ public class AggregationDataMapSchema extends DataMapSchema { return false; } } + } else { + // in case of any expression one column can be derived from multiple column + // in that case we cannot do rollup so hit the maintable + return false; } } return true; http://git-wip-us.apache.org/repos/asf/carbondata/blob/adb8c135/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala new file mode 100644 index 0000000..4171690 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.integration.spark.testsuite.preaggregate + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll { + + override def beforeAll: Unit = { + sql("DROP TABLE IF EXISTS mainTable") + sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'") + } + test("test pre agg create table with expression 1") { + sql( + s""" + | CREATE DATAMAP agg0 ON TABLE mainTable USING 'preaggregate' AS + | SELECT name, + | count(age) + | FROM mainTable GROUP BY name + | """.stripMargin) + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count") + } + + test("test pre agg create table with expression 2") { + sql( + s""" + | CREATE DATAMAP agg1 ON TABLE mainTable USING 'preaggregate' AS + | SELECT name, + | sum(CASE WHEN age=35 THEN id ELSE 0 END) + | FROM mainTable GROUP BY name + | """.stripMargin) + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum") + } + + test("test pre agg create table with expression 3") { + sql( + s""" + | CREATE DATAMAP agg2 ON TABLE mainTable USING 'preaggregate' AS + | SELECT name, + | sum(CASE WHEN age=35 THEN id ELSE 0 END), + | city + | FROM mainTable GROUP BY name,city + | """.stripMargin) + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum") + } + + test("test pre agg create table with expression 4") { + sql( + s""" + | CREATE DATAMAP agg3 ON TABLE mainTable USING 'preaggregate' AS + | SELECT name, + | sum(CASE WHEN age=27 THEN id ELSE 0 END) + | FROM mainTable GROUP BY name + | """.stripMargin) + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum") + } + + test("test pre agg create table with expression 5") { + sql( + s""" + | CREATE DATAMAP agg4 ON TABLE mainTable USING 'preaggregate' AS + | SELECT name, + | sum(CASE WHEN age=27 THEN id ELSE 0 END), + | SUM(CASE WHEN age=35 THEN id ELSE 0 END) + | FROM mainTable GROUP BY name + | """.stripMargin) + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_0_sum") + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_1_sum") + } + + test("test pre agg create table with expression 6") { + sql( + s""" + | CREATE DATAMAP agg5 ON TABLE mainTable USING 'preaggregate' AS + | SELECT name, + | COUNT(CASE WHEN age=27 THEN(CASE WHEN name='eason' THEN id ELSE 0 END) ELSE 0 END) + | FROM mainTable GROUP BY name + | """.stripMargin) + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg5"), true, "maintable_column_0_count") + } + + override def afterAll: Unit = { + sql("DROP TABLE IF EXISTS mainTable") + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/adb8c135/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index c7a7b69..1e368cf 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -79,7 +79,7 @@ case class Field(column: String, var dataType: Option[String], name: Option[Stri } case class DataMapField(var aggregateFunction: String = "", - columnTableRelation: Option[ColumnTableRelation] = None) { + columnTableRelationList: Option[Seq[ColumnTableRelation]] = None) { } case class ColumnTableRelation(parentColumnName: String, parentColumnId: String, @@ -435,14 +435,21 @@ class TableNewProcessor(cm: TableModel) { if(isParentColumnRelation) { val dataMapField = map.get.get(field).get columnSchema.setFunction(dataMapField.aggregateFunction) - val relation = dataMapField.columnTableRelation.get - val parentColumnTableRelationList = new util.ArrayList[ParentColumnTableRelation] - val relationIdentifier = new RelationIdentifier( - relation.parentDatabaseName, relation.parentTableName, relation.parentTableId) - val parentColumnTableRelation = new ParentColumnTableRelation( - relationIdentifier, relation.parentColumnId, relation.parentColumnName) - parentColumnTableRelationList.add(parentColumnTableRelation) - columnSchema.setParentColumnTableRelations(parentColumnTableRelationList) + val columnRelationList = dataMapField.columnTableRelationList.get + val parentColumnTableRelationList = new util.ArrayList[ParentColumnTableRelation] + columnRelationList.foreach { + columnRelation => + val relationIdentifier = new RelationIdentifier( + columnRelation.parentDatabaseName, + columnRelation.parentTableName, + columnRelation.parentTableId) + val parentColumnTableRelation = new ParentColumnTableRelation( + relationIdentifier, + columnRelation.parentColumnId, + columnRelation.parentColumnName) + parentColumnTableRelationList.add(parentColumnTableRelation) + } + columnSchema.setParentColumnTableRelations(parentColumnTableRelationList) } // TODO: Need to fill RowGroupID, converted type // & Number of Children after DDL finalization @@ -467,10 +474,11 @@ class TableNewProcessor(cm: TableModel) { // Sort columns should be at the begin of all columns cm.sortKeyDims.get.foreach { keyDim => val field = cm.dimCols.find(keyDim equals _.column).get - val encoders = if (cm.parentTable.isDefined && cm.dataMapRelation.get.get(field).isDefined) { + val encoders = if (getEncoderFromParent(field)) { cm.parentTable.get.getColumnByName( cm.parentTable.get.getTableName, - cm.dataMapRelation.get.get(field).get.columnTableRelation.get.parentColumnName).getEncoder + cm.dataMapRelation.get.get(field).get.columnTableRelationList. + get(0).parentColumnName).getEncoder } else { val encoders = new java.util.ArrayList[Encoding]() encoders.add(Encoding.DICTIONARY) @@ -491,12 +499,11 @@ class TableNewProcessor(cm: TableModel) { cm.dimCols.foreach { field => val sortField = cm.sortKeyDims.get.find(field.column equals _) if (sortField.isEmpty) { - val encoders = if (cm.parentTable.isDefined && - cm.dataMapRelation.get.get(field).isDefined) { + val encoders = if (getEncoderFromParent(field)) { cm.parentTable.get.getColumnByName( cm.parentTable.get.getTableName, cm.dataMapRelation.get.get(field).get. - columnTableRelation.get.parentColumnName).getEncoder + columnTableRelationList.get(0).parentColumnName).getEncoder } else { val encoders = new java.util.ArrayList[Encoding]() encoders.add(Encoding.DICTIONARY) @@ -524,14 +531,14 @@ class TableNewProcessor(cm: TableModel) { var isAggFunPresent = false // getting the encoder from maintable so whatever encoding is applied in maintable // same encoder can be applied on aggregate table - val encoders = if (cm.parentTable.isDefined && cm.dataMapRelation.get.get(field).isDefined) { + val encoders = if (getEncoderFromParent(field)) { isAggFunPresent = cm.dataMapRelation.get.get(field).get.aggregateFunction.equalsIgnoreCase("sum") || cm.dataMapRelation.get.get(field).get.aggregateFunction.equals("avg") if(!isAggFunPresent) { cm.parentTable.get.getColumnByName( cm.parentTable.get.getTableName, - cm.dataMapRelation.get.get(field).get.columnTableRelation.get.parentColumnName) + cm.dataMapRelation.get.get(field).get.columnTableRelationList.get(0).parentColumnName) .getEncoder } else { new java.util.ArrayList[Encoding]() @@ -668,6 +675,17 @@ class TableNewProcessor(cm: TableModel) { tableInfo } + /** + * Method to check to get the encoder from parent or not + * @param field column field + * @return get encoder from parent + */ + private def getEncoderFromParent(field: Field) : Boolean = { + cm.parentTable.isDefined && + cm.dataMapRelation.get.get(field).isDefined && + cm.dataMapRelation.get.get(field).get.columnTableRelationList.size==1 + } + // For checking if the specified col group columns are specified in fields list. protected def checkColGroupsValidity(colGrps: Seq[String], allCols: Seq[ColumnSchema], http://git-wip-us.apache.org/repos/asf/carbondata/blob/adb8c135/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala index c1f9e8a..d473bc4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala @@ -47,6 +47,19 @@ object CarbonExpressions { } /** + * unapply method of Cast class with expression. + */ + object MatchCastExpression { + def unapply(expr: Expression): Option[(Expression, DataType)] = { + expr match { + case a: Cast if a.child.isInstanceOf[Expression] => + Some((a.child.asInstanceOf[Expression], a.dataType)) + case _ => None + } + } + } + + /** * unapply method of Describe Table format. */ object CarbonDescribeTable { http://git-wip-us.apache.org/repos/asf/carbondata/blob/adb8c135/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index 1f5bd41..217436d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -19,20 +19,19 @@ package org.apache.spark.sql.execution.command.preaaggregate import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.collection.JavaConverters._ -import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSession, DataFrame, SparkSession} -import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias} +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSession, SparkSession} +import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias, MatchCast => Cast, MatchCastExpression} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, NamedExpression, ScalaUDF} import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field} +import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.CarbonRelation -import org.apache.spark.sql.types.DataType -import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast} -import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.spark.sql.parser.CarbonSpark2SqlParser +import org.apache.spark.sql.types.DataType import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -116,29 +115,45 @@ object PreAggregateUtil { throw new MalformedCarbonCommandException( "Pre Aggregation is not supported on Pre-Aggregated Table") } + var counter = 0 aggExp.map { - case Alias(attr: AggregateExpression, _) => + case Alias(attr: AggregateExpression, name) => if (attr.isDistinct) { throw new MalformedCarbonCommandException( "Distinct is not supported On Pre Aggregation") } - fieldToDataMapFieldMap ++= validateAggregateFunctionAndGetFields(carbonTable, + fieldToDataMapFieldMap ++= validateAggregateFunctionAndGetFields( + carbonTable, attr.aggregateFunction, parentTableName, parentDatabaseName, - parentTableId) + parentTableId, + "column_" + counter) + counter = counter + 1 case attr: AttributeReference => - fieldToDataMapFieldMap += getField(attr.name, + val columnRelation = getColumnRelation( + attr.name, + parentTableId, + parentTableName, + parentDatabaseName, + carbonTable) + fieldToDataMapFieldMap += createField( + attr.name, attr.dataType, - parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, parentTableName = parentTableName, - parentDatabaseName = parentDatabaseName, parentTableId = parentTableId) + columnTableRelationList = Seq(columnRelation)) case Alias(attr: AttributeReference, _) => - fieldToDataMapFieldMap += getField(attr.name, + val columnRelation = getColumnRelation( + attr.name, + parentTableId, + parentTableName, + parentDatabaseName, + carbonTable) + fieldToDataMapFieldMap += createField( + attr.name, attr.dataType, - parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, parentTableName = parentTableName, - parentDatabaseName = parentDatabaseName, parentTableId = parentTableId) + columnTableRelationList = Seq(columnRelation)) case _@Alias(s: ScalaUDF, name) if name.equals("preAgg") => case _ => throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${ @@ -148,6 +163,34 @@ object PreAggregateUtil { } /** + * Below method will be used to get the column relation + * with the parent column which will be used during query and data loading + * @param parentColumnName + * parent column name + * @param parentTableId + * parent column id + * @param parentTableName + * parent table name + * @param parentDatabaseName + * parent database name + * @param carbonTable + * carbon table + * @return column relation object + */ + def getColumnRelation(parentColumnName: String, + parentTableId: String, + parentTableName: String, + parentDatabaseName: String, + carbonTable: CarbonTable) : ColumnTableRelation = { + val parentColumnId = carbonTable.getColumnByName(parentTableName, parentColumnName).getColumnId + val columnTableRelation = ColumnTableRelation(parentColumnName = parentColumnName, + parentColumnId = parentColumnId, + parentTableName = parentTableName, + parentDatabaseName = parentDatabaseName, parentTableId = parentTableId) + columnTableRelation + } + + /** * Below method will be used to validate about the aggregate function * which is applied on select query. * Currently sum, max, min, count, avg is supported @@ -155,102 +198,105 @@ object PreAggregateUtil { * In case of avg it will return two fields one for count * and other of sum of that column to support rollup * - * @param carbonTable - * @param aggFunctions - * @param parentTableName - * @param parentDatabaseName - * @param parentTableId + * @param carbonTable parent carbon table + * @param aggFunctions aggregation function + * @param parentTableName parent table name + * @param parentDatabaseName parent database name + * @param parentTableId parent column id + * @param newColumnName + * In case of any expression this will be used as a column name for pre aggregate * @return list of fields */ def validateAggregateFunctionAndGetFields(carbonTable: CarbonTable, aggFunctions: AggregateFunction, parentTableName: String, parentDatabaseName: String, - parentTableId: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = { + parentTableId: String, + newColumnName: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = { val list = scala.collection.mutable.ListBuffer.empty[(Field, DataMapField)] aggFunctions match { - case sum@Sum(attr: AttributeReference) => - list += getField(attr.name, - attr.dataType, - sum.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case sum@Sum(Cast(attr: AttributeReference, changeDataType: DataType)) => - list += getField(attr.name, + case sum@Sum(MatchCastExpression(exp: Expression, changeDataType: DataType)) => + list += createFieldForAggregateExpression( + exp, changeDataType, - sum.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case count@Count(Seq(attr: AttributeReference)) => - list += getField(attr.name, - attr.dataType, - count.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case count@Count(Seq(Cast(attr: AttributeReference, _))) => - list += getField(attr.name, - attr.dataType, - count.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case min@Min(attr: AttributeReference) => - list += getField(attr.name, - attr.dataType, - min.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case min@Min(Cast(attr: AttributeReference, changeDataType: DataType)) => - list += getField(attr.name, + carbonTable, + newColumnName, + sum.prettyName) + case sum@Sum(exp: Expression) => + list += createFieldForAggregateExpression( + exp, + sum.dataType, + carbonTable, + newColumnName, + sum.prettyName) + case count@Count(Seq(MatchCastExpression(exp: Expression, changeDataType: DataType))) => + list += createFieldForAggregateExpression( + exp, changeDataType, - min.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case max@Max(attr: AttributeReference) => - list += getField(attr.name, - attr.dataType, - max.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case max@Max(Cast(attr: AttributeReference, changeDataType: DataType)) => - list += getField(attr.name, + carbonTable, + newColumnName, + count.prettyName) + case count@Count(Seq(exp: Expression)) => + list += createFieldForAggregateExpression( + exp, + count.dataType, + carbonTable, + newColumnName, + count.prettyName) + case min@Min(MatchCastExpression(exp: Expression, changeDataType: DataType)) => + list += createFieldForAggregateExpression( + exp, changeDataType, - max.prettyName, - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case Average(attr: AttributeReference) => - list += getField(attr.name, - attr.dataType, - "sum", - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - list += getField(attr.name, - attr.dataType, - "count", - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - case Average(Cast(attr: AttributeReference, changeDataType: DataType)) => - list += getField(attr.name, + carbonTable, + newColumnName, + min.prettyName) + case min@Min(exp: Expression) => + list += createFieldForAggregateExpression( + exp, + min.dataType, + carbonTable, + newColumnName, + min.prettyName) + case max@Max(MatchCastExpression(exp: Expression, changeDataType: DataType)) => + list += createFieldForAggregateExpression( + exp, changeDataType, - "sum", - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) - list += getField(attr.name, + carbonTable, + newColumnName, + max.prettyName) + case max@Max(exp: Expression) => + list += createFieldForAggregateExpression( + exp, + max.dataType, + carbonTable, + newColumnName, + max.prettyName) + case Average(MatchCastExpression(exp: Expression, changeDataType: DataType)) => + list += createFieldForAggregateExpression( + exp, changeDataType, - "count", - carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, - parentTableName, - parentDatabaseName, parentTableId = parentTableId) + carbonTable, + newColumnName, + "sum") + list += createFieldForAggregateExpression( + exp, + changeDataType, + carbonTable, + newColumnName, + "count") + case avg@Average(exp: Expression) => + list += createFieldForAggregateExpression( + exp, + avg.dataType, + carbonTable, + newColumnName, + "sum") + list += createFieldForAggregateExpression( + exp, + avg.dataType, + carbonTable, + newColumnName, + "count") case others@_ => throw new MalformedCarbonCommandException(s"Un-Supported Aggregation Type: ${ others.prettyName}") @@ -258,35 +304,79 @@ object PreAggregateUtil { } /** + * Below method will be used to get the field and its data map field object + * for aggregate expression + * @param expression + * expression in aggregate function + * @param dataType + * data type + * @param carbonTable + * parent carbon table + * @param newColumnName + * column name of aggregate table + * @param aggregationName + * aggregate function name + * @return field and its metadata tuple + */ + def createFieldForAggregateExpression( + expression: Expression, + dataType: DataType, + carbonTable: CarbonTable, + newColumnName: String, + aggregationName: String): (Field, DataMapField) = { + val parentColumnsName = new ArrayBuffer[String]() + expression.transform { + case attr: AttributeReference => + parentColumnsName += attr.name + attr + } + val arrayBuffer = parentColumnsName.map { name => + getColumnRelation(name, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName, + carbonTable) + } + // if parent column relation is of size more than one that means aggregate table + // column is derived from multiple column of main table + // or if expression is not a instance of attribute reference + // then use column name which is passed + val columnName = + if (parentColumnsName.size > 1 && !expression.isInstanceOf[AttributeReference]) { + newColumnName + } else { + expression.asInstanceOf[AttributeReference].name + } + createField(columnName, + dataType, + aggregationName, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName, + arrayBuffer) + } + + /** * Below method will be used to get the fields object for pre aggregate table * * @param columnName * @param dataType * @param aggregateType - * @param parentColumnId * @param parentTableName - * @param parentDatabaseName - * @param parentTableId + * @param columnTableRelationList + * List of column relation with parent * @return fields object */ - def getField(columnName: String, + def createField(columnName: String, dataType: DataType, aggregateType: String = "", - parentColumnId: String, parentTableName: String, - parentDatabaseName: String, - parentTableId: String): (Field, DataMapField) = { + columnTableRelationList: Seq[ColumnTableRelation]): (Field, DataMapField) = { val actualColumnName = if (aggregateType.equals("")) { parentTableName + '_' + columnName } else { parentTableName + '_' + columnName + '_' + aggregateType } val rawSchema = '`' + actualColumnName + '`' + ' ' + dataType.typeName - val columnTableRelation = ColumnTableRelation(parentColumnName = columnName, - parentColumnId = parentColumnId, - parentTableName = parentTableName, - parentDatabaseName = parentDatabaseName, parentTableId = parentTableId) - val dataMapField = DataMapField(aggregateType, Some(columnTableRelation)) + val dataMapField = DataMapField(aggregateType, Some(columnTableRelationList)) if (dataType.typeName.startsWith("decimal")) { val (precision, scale) = CommonUtil.getScaleAndPrecision(dataType.catalogString) (Field(column = actualColumnName, @@ -508,7 +598,10 @@ object PreAggregateUtil { val headers = dataMapSchemas.find(_.getChildSchema.getTableName.equalsIgnoreCase( dataMapIdentifier.table)) match { case Some(dataMapSchema) => - dataMapSchema.getChildSchema.getListOfColumns.asScala.sortBy(_.getSchemaOrdinal).map( + val columns = dataMapSchema.getChildSchema.getListOfColumns.asScala + .filter{column => + !column.getColumnName.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)} + columns.sortBy(_.getSchemaOrdinal).map( _.getColumnName).mkString(",") case None => throw new RuntimeException( http://git-wip-us.apache.org/repos/asf/carbondata/blob/adb8c135/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala index 6a4ef56..d4358b6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala @@ -111,8 +111,8 @@ object TimeSeriesUtil { .LinkedHashMap[Field, DataMapField], timeSeriesColumn: String) : Any = { val isTimeSeriesColumnExits = fieldMapping - .exists(obj => obj._2.columnTableRelation.isDefined && - obj._2.columnTableRelation.get.parentColumnName + .exists(obj => obj._2.columnTableRelationList.isDefined && + obj._2.columnTableRelationList.get(0).parentColumnName .equalsIgnoreCase(timeSeriesColumn) && obj._2.aggregateFunction.isEmpty) if(!isTimeSeriesColumnExits) { @@ -134,8 +134,8 @@ object TimeSeriesUtil { timeSeriesColumn: String, timeSeriesFunction: String) : Any = { val isTimeSeriesColumnExits = fieldMapping - .find(obj => obj._2.columnTableRelation.isDefined && - obj._2.columnTableRelation.get.parentColumnName + .find(obj => obj._2.columnTableRelationList.isDefined && + obj._2.columnTableRelationList.get(0).parentColumnName .equalsIgnoreCase(timeSeriesColumn) && obj._2.aggregateFunction.isEmpty) isTimeSeriesColumnExits.get._2.aggregateFunction = timeSeriesFunction http://git-wip-us.apache.org/repos/asf/carbondata/blob/adb8c135/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala index 76c39a4..79561c6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.SPARK_VERSION import org.apache.spark.sql._ -import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCast} +import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCast, MatchCastExpression} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, Literal, NamedExpression, ScalaUDF, SortOrder} @@ -1290,11 +1290,11 @@ object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] { attrExpression.aggregateFunction match { case Sum(attr: AttributeReference) => (attr.name + "_sum", alias) :: Nil - case Sum(MatchCast(attr: AttributeReference, _)) => + case Sum(MatchCastExpression(attr: AttributeReference, _)) => (attr.name + "_sum", alias) :: Nil case Count(Seq(attr: AttributeReference)) => (attr.name + "_count", alias) :: Nil - case Count(Seq(MatchCast(attr: AttributeReference, _))) => + case Count(Seq(MatchCastExpression(attr: AttributeReference, _))) => (attr.name + "_count", alias) :: Nil case Average(attr: AttributeReference) => Seq((attr.name + "_sum", Alias(attrExpression. @@ -1303,7 +1303,7 @@ object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] { (attr.name, Alias(attrExpression. copy(aggregateFunction = Count(attr), resultId = NamedExpression.newExprId), attr.name + "_count")())) - case Average(cast@MatchCast(attr: AttributeReference, _)) => + case Average(cast@MatchCastExpression(attr: AttributeReference, _)) => Seq((attr.name + "_sum", Alias(attrExpression. copy(aggregateFunction = Sum(cast), resultId = NamedExpression.newExprId),
