[CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Expression support inside aggregate function for Query
Support transforming of query plan for aggregate table when query aggregate function contains any expression Support sub query in Preaggregate table This closes #1728 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c70e73f1 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c70e73f1 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c70e73f1 Branch: refs/heads/branch-1.3 Commit: c70e73f11b281cde9e8310d5a2d5640c75c30665 Parents: af4277e Author: kumarvishal <[email protected]> Authored: Mon Dec 25 15:04:39 2017 +0530 Committer: ravipesala <[email protected]> Committed: Mon Jan 8 11:52:43 2018 +0530 ---------------------------------------------------------------------- .../schema/table/AggregationDataMapSchema.java | 51 +- .../core/preagg/AggregateQueryPlan.java | 48 + .../core/preagg/AggregateTableSelector.java | 33 +- .../carbondata/core/preagg/QueryColumn.java | 27 +- .../carbondata/core/preagg/QueryPlan.java | 59 - .../TestPreAggregateExpressions.scala | 68 + .../TestPreAggregateTableSelection.scala | 5 + .../TestPreAggregateWithSubQuery.scala | 88 + .../preaaggregate/PreAggregateUtil.scala | 203 ++- .../sql/hive/CarbonPreAggregateRules.scala | 1633 ++++++++---------- .../src/main/spark2.1/CarbonSessionState.scala | 2 +- .../src/main/spark2.2/CarbonSessionState.scala | 2 +- 12 files changed, 1101 insertions(+), 1118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java index 4b2d492..e061812 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java @@ -62,6 +62,8 @@ public class AggregationDataMapSchema extends DataMapSchema { */ private int ordinal = Integer.MAX_VALUE; + private Set aggExpToColumnMapping; + public AggregationDataMapSchema(String dataMapName, String className) { super(dataMapName, className); } @@ -203,23 +205,6 @@ public class AggregationDataMapSchema extends DataMapSchema { } /** - * Below method is to check if parent column with matching aggregate function - * @param parentColumnName - * parent column name - * @param aggFunction - * aggregate function - * @return is matching - */ - public boolean isColumnWithAggFunctionExists(String parentColumnName, String aggFunction) { - Set<String> aggFunctions = parentColumnToAggregationsMapping.get(parentColumnName); - if (null != aggFunctions && aggFunctions.contains(aggFunction)) { - return true; - } - return false; - } - - - /** * Method to prepare mapping of parent to list of aggregation function applied on that column * @param listOfColumns * child column schema list @@ -341,4 +326,36 @@ public class AggregationDataMapSchema extends DataMapSchema { public int getOrdinal() { return ordinal; } + + /** + * Below method will be used to get the aggregation column based on index + * It will return the first aggregation column found based on index + * @param searchStartIndex + * start index + * @param sortedColumnSchema + * list of sorted table columns + * @return found column list + * + */ + public ColumnSchema getAggColumnBasedOnIndex(int searchStartIndex, + List<ColumnSchema> sortedColumnSchema) { + ColumnSchema columnSchema = null; + for (int i = searchStartIndex; i < sortedColumnSchema.size(); i++) { + if (!sortedColumnSchema.get(i).getAggFunction().isEmpty()) { + columnSchema = sortedColumnSchema.get(i); + break; + } + } + return columnSchema; + } + + public synchronized Set getAggExpToColumnMapping() { + return aggExpToColumnMapping; + } + + public synchronized void setAggExpToColumnMapping(Set aggExpToColumnMapping) { + if (null == this.aggExpToColumnMapping) { + this.aggExpToColumnMapping = aggExpToColumnMapping; + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/core/src/main/java/org/apache/carbondata/core/preagg/AggregateQueryPlan.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/AggregateQueryPlan.java b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateQueryPlan.java new file mode 100644 index 0000000..e895bde --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateQueryPlan.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.preagg; + +import java.util.List; + +/** + * class to maintain the query plan to select the data map tables + */ +public class AggregateQueryPlan { + + /** + * List of projection columns + */ + private List<QueryColumn> projectionColumn; + + /** + * list of filter columns + */ + private List<QueryColumn> filterColumns; + + public AggregateQueryPlan(List<QueryColumn> projectionColumn, List<QueryColumn> filterColumns) { + this.projectionColumn = projectionColumn; + this.filterColumns = filterColumns; + } + + public List<QueryColumn> getProjectionColumn() { + return projectionColumn; + } + + public List<QueryColumn> getFilterColumns() { + return filterColumns; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java index 5347567..79d0904 100644 --- a/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java +++ b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java @@ -36,15 +36,15 @@ public class AggregateTableSelector { /** * current query plan */ - private QueryPlan queryPlan; + private AggregateQueryPlan aggregateQueryPlan; /** * parent table */ private CarbonTable parentTable; - public AggregateTableSelector(QueryPlan queryPlan, CarbonTable parentTable) { - this.queryPlan = queryPlan; + public AggregateTableSelector(AggregateQueryPlan aggregateQueryPlan, CarbonTable parentTable) { + this.aggregateQueryPlan = aggregateQueryPlan; this.parentTable = parentTable; } @@ -58,9 +58,8 @@ public class AggregateTableSelector { * @return selected pre aggregate table schema */ public List<DataMapSchema> selectPreAggDataMapSchema() { - List<QueryColumn> projectionColumn = queryPlan.getProjectionColumn(); - List<QueryColumn> aggColumns = queryPlan.getAggregationColumns(); - List<QueryColumn> filterColumns = queryPlan.getFilterColumns(); + List<QueryColumn> projectionColumn = aggregateQueryPlan.getProjectionColumn(); + List<QueryColumn> filterColumns = aggregateQueryPlan.getFilterColumns(); List<DataMapSchema> dataMapSchemaList = parentTable.getTableInfo().getDataMapSchemaList(); List<DataMapSchema> selectedDataMapSchema = new ArrayList<>(); boolean isMatch; @@ -74,6 +73,7 @@ public class AggregateTableSelector { getColumnSchema(queryColumn, aggregationDataMapSchema); if (null == columnSchemaByParentName) { isMatch = false; + break; } } if (isMatch) { @@ -99,6 +99,7 @@ public class AggregateTableSelector { getColumnSchema(queryColumn, aggregationDataMapSchema); if (null == columnSchemaByParentName) { isMatch = false; + break; } } if (isMatch) { @@ -110,26 +111,6 @@ public class AggregateTableSelector { return selectedDataMapSchema; } } - // match aggregation columns - if (null != aggColumns && !aggColumns.isEmpty()) { - List<DataMapSchema> dmSchemaToIterate = - selectedDataMapSchema.isEmpty() ? dataMapSchemaList : selectedDataMapSchema; - selectedDataMapSchema = new ArrayList<>(); - for (DataMapSchema dmSchema : dmSchemaToIterate) { - isMatch = true; - for (QueryColumn queryColumn : aggColumns) { - AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema) dmSchema; - if (!aggregationDataMapSchema - .isColumnWithAggFunctionExists(queryColumn.getColumnSchema().getColumnName(), - queryColumn.getAggFunction())) { - isMatch = false; - } - } - if (isMatch) { - selectedDataMapSchema.add(dmSchema); - } - } - } return selectedDataMapSchema; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java index c91a703..42daa6c 100644 --- a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java +++ b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java @@ -30,16 +30,6 @@ public class QueryColumn { private ColumnSchema columnSchema; /** - * to store the change data type in case of cast - */ - private String changedDataType; - - /** - * aggregation function applied - */ - private String aggFunction; - - /** * is filter column */ private boolean isFilterColumn; @@ -49,11 +39,8 @@ public class QueryColumn { */ private String timeseriesFunction; - public QueryColumn(ColumnSchema columnSchema, String changedDataType, String aggFunction, - boolean isFilterColumn, String timeseriesFunction) { + public QueryColumn(ColumnSchema columnSchema, boolean isFilterColumn, String timeseriesFunction) { this.columnSchema = columnSchema; - this.changedDataType = changedDataType; - this.aggFunction = aggFunction; this.isFilterColumn = isFilterColumn; this.timeseriesFunction = timeseriesFunction; } @@ -62,14 +49,6 @@ public class QueryColumn { return columnSchema; } - public String getChangedDataType() { - return changedDataType; - } - - public String getAggFunction() { - return aggFunction; - } - public boolean isFilterColumn() { return isFilterColumn; } @@ -92,9 +71,6 @@ public class QueryColumn { if (!columnSchema.equals(that.columnSchema)) { return false; } - if (!(aggFunction != null ? aggFunction.equals(that.aggFunction) : that.aggFunction == null)) { - return false; - } return timeseriesFunction != null ? timeseriesFunction.equals(that.timeseriesFunction) : that.timeseriesFunction == null; @@ -102,7 +78,6 @@ public class QueryColumn { @Override public int hashCode() { int result = columnSchema.hashCode(); - result = 31 * result + (aggFunction != null ? aggFunction.hashCode() : 0); result = 31 * result + (timeseriesFunction != null ? timeseriesFunction.hashCode() : 0); result = 31 * result + (isFilterColumn ? 1 : 0); return result; http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java b/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java deleted file mode 100644 index 21a34fa..0000000 --- a/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.preagg; - -import java.util.List; - -/** - * class to maintain the query plan to select the data map tables - */ -public class QueryPlan { - - /** - * List of projection columns - */ - private List<QueryColumn> projectionColumn; - - /** - * list of aggregation columns - */ - private List<QueryColumn> aggregationColumns; - - /** - * list of filter columns - */ - private List<QueryColumn> filterColumns; - - public QueryPlan(List<QueryColumn> projectionColumn, List<QueryColumn> aggregationColumns, - List<QueryColumn> filterColumns) { - this.projectionColumn = projectionColumn; - this.aggregationColumns = aggregationColumns; - this.filterColumns = filterColumns; - } - - public List<QueryColumn> getProjectionColumn() { - return projectionColumn; - } - - public List<QueryColumn> getAggregationColumns() { - return aggregationColumns; - } - - public List<QueryColumn> getFilterColumns() { - return filterColumns; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala index 4171690..0b22c56 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala @@ -17,6 +17,10 @@ package org.apache.carbondata.integration.spark.testsuite.preaggregate +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll @@ -25,6 +29,7 @@ class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll { override def beforeAll: Unit = { sql("DROP TABLE IF EXISTS mainTable") sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'") + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable") } test("test pre agg create table with expression 1") { sql( @@ -94,6 +99,69 @@ class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll { | """.stripMargin) checkExistence(sql("DESCRIBE FORMATTED mainTable_agg5"), true, "maintable_column_0_count") } + test("test pre agg table selection with expression 1") { + val df = sql("select name as NewName, count(age) as sum from mainTable group by name order by name") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0") + } + + + test("test pre agg table selection with expression 2") { + val df = sql("select name as NewName, sum(case when age=35 then id else 0 end) as sum from mainTable group by name order by name") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1") + } + + test("test pre agg table selection with expression 3") { + val df = sql("select sum(case when age=35 then id else 0 end) from maintable") + checkAnswer(df, Seq(Row(6.0))) + } + + test("test pre agg table selection with expression 4") { + val df = sql("select sum(case when age=27 then id else 0 end) from maintable") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3") + checkAnswer(df, Seq(Row(2.0))) + } + + test("test pre agg table selection with expression 5") { + val df = sql("select sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from maintable") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg4") + checkAnswer(df, Seq(Row(2.0,6.0))) + } + + /** + * Below method will be used to validate the table name is present in the plan or not + * @param plan + * query plan + * @param actualTableName + * table name to be validated + */ + def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit ={ + var isValidPlan = false + plan.transform { + // first check if any preaTable1 scala function is applied it is present is in plan + // then call is from create preaTable1regate table class so no need to transform the query plan + case ca:CarbonRelation => + if (ca.isInstanceOf[CarbonDatasourceHadoopRelation]) { + val relation = ca.asInstanceOf[CarbonDatasourceHadoopRelation] + if(relation.carbonTable.getTableName.equalsIgnoreCase(actualTableName)) { + isValidPlan = true + } + } + ca + case logicalRelation:LogicalRelation => + if(logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) { + val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation] + if(relation.carbonTable.getTableName.equalsIgnoreCase(actualTableName)) { + isValidPlan = true + } + } + logicalRelation + } + if(!isValidPlan) { + assert(false) + } else { + assert(true) + } + } override def afterAll: Unit = { sql("DROP TABLE IF EXISTS mainTable") http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala index fdeb2bd..322827e 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala @@ -230,6 +230,11 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll { checkAnswer(df, Row("vishal", 29)) } + test("test PreAggregate table selection 29") { + val df = sql("select sum(id) from mainTable group by name") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg2") + } + override def afterAll: Unit = { sql("drop table if exists mainTable") sql("drop table if exists lineitem") http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateWithSubQuery.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateWithSubQuery.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateWithSubQuery.scala new file mode 100644 index 0000000..0f87803 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateWithSubQuery.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.integration.spark.testsuite.preaggregate + +import org.apache.spark.sql.CarbonDatasourceHadoopRelation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + + +class TestPreAggregateWithSubQuery extends QueryTest with BeforeAndAfterAll { + + override def beforeAll: Unit = { + sql("drop table if exists mainTable") + sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'") + sql("CREATE TABLE mainTable1(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'") + sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,sum(age) from mainTable group by name") + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable") + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable1") + } + + test("test sub query PreAggregate table selection 1") { + val df = sql("select t2.newnewname as newname from mainTable1 t1 join (select name as newnewname,sum(age) as sum from mainTable group by name )t2 on t1.name=t2.newnewname group by t2.newnewname") + matchTable(collectLogicalRelation(df.queryExecution.analyzed), "maintable_agg0") + } + + test("test sub query PreAggregate table selection 2") { + val df = sql("select t1.name,t1.city from mainTable1 t1 join (select name as newnewname,sum(age) as sum from mainTable group by name )t2 on t1.name=t2.newnewname") + matchTable(collectLogicalRelation(df.queryExecution.analyzed), "maintable_agg0") + } + + test("test sub query PreAggregate table selection 3") { + val df = sql("select t1.name,t2.sum from mainTable1 t1 join (select name as newnewname,sum(age) as sum from mainTable group by name )t2 on t1.name=t2.newnewname") + matchTable(collectLogicalRelation(df.queryExecution.analyzed), "maintable_agg0") + } + + test("test sub query PreAggregate table selection 4") { + val df = sql("select t1.name,t2.sum from mainTable1 t1 join (select name,sum(age) as sum from mainTable group by name )t2 on t1.name=t2.name group by t1.name, t2.sum") + matchTable(collectLogicalRelation(df.queryExecution.analyzed), "maintable_agg0") + } + + /** + * Below method will be used to collect all the logical relation from logical plan + * @param logicalPlan + * query logical plan + * @return all the logical relation + */ + def collectLogicalRelation(logicalPlan: LogicalPlan) : Seq[LogicalRelation] = { + logicalPlan.collect{ + case l:LogicalRelation => l + } + } + + /** + * Below method will be used to match the logical relation + * @param logicalRelations + * all logical relation + * @param tableName + * table name + */ + def matchTable(logicalRelations: Seq[LogicalRelation], tableName: String) { + assert(logicalRelations.exists { + case l:LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.getTableName. + equalsIgnoreCase(tableName) + }) + } + + override def afterAll: Unit = { + sql("drop table if exists mainTable") + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index 217436d..86d0c6a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -20,18 +20,18 @@ import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.collection.JavaConverters._ import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSession, SparkSession} -import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias, MatchCast => Cast, MatchCastExpression} +import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias, MatchCastExpression} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation} -import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, NamedExpression, ScalaUDF} -import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSeq, Cast, Expression, ExprId, NamedExpression, ScalaUDF} +import org.apache.spark.sql.catalyst.expressions.aggregate.{Count, _} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field} import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.parser.CarbonSpark2SqlParser -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.{DataType, LongType} import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -85,14 +85,10 @@ object PreAggregateUtil { /** * Below method will be used to get the fields from expressions - * @param groupByExp - * grouping expression - * @param aggExp - * aggregate expression - * @param logicalRelation - * logical relation - * @param selectStmt - * select statement + * @param groupByExp grouping expression + * @param aggExp aggregate expression + * @param logicalRelation logical relation + * @param selectStmt select statement * @return fields from expressions */ def getFieldsFromPlan(groupByExp: Seq[Expression], @@ -165,16 +161,11 @@ object PreAggregateUtil { /** * Below method will be used to get the column relation * with the parent column which will be used during query and data loading - * @param parentColumnName - * parent column name - * @param parentTableId - * parent column id - * @param parentTableName - * parent table name - * @param parentDatabaseName - * parent database name - * @param carbonTable - * carbon table + * @param parentColumnName parent column name + * @param parentTableId parent column id + * @param parentTableName parent table name + * @param parentDatabaseName parent database name + * @param carbonTable carbon table * @return column relation object */ def getColumnRelation(parentColumnName: String, @@ -280,7 +271,7 @@ object PreAggregateUtil { "sum") list += createFieldForAggregateExpression( exp, - changeDataType, + LongType, carbonTable, newColumnName, "count") @@ -293,7 +284,7 @@ object PreAggregateUtil { "sum") list += createFieldForAggregateExpression( exp, - avg.dataType, + LongType, carbonTable, newColumnName, "count") @@ -306,16 +297,11 @@ object PreAggregateUtil { /** * Below method will be used to get the field and its data map field object * for aggregate expression - * @param expression - * expression in aggregate function - * @param dataType - * data type - * @param carbonTable - * parent carbon table - * @param newColumnName - * column name of aggregate table - * @param aggregationName - * aggregate function name + * @param expression expression in aggregate function + * @param dataType data type + * @param carbonTable parent carbon table + * @param newColumnName column name of aggregate table + * @param aggregationName aggregate function name * @return field and its metadata tuple */ def createFieldForAggregateExpression( @@ -552,8 +538,7 @@ object PreAggregateUtil { * Below method will be used to update logical plan * this is required for creating pre aggregate tables, * so @CarbonPreAggregateRules will not be applied during creation - * @param logicalPlan - * actual logical plan + * @param logicalPlan actual logical plan * @return updated plan */ def updatePreAggQueyPlan(logicalPlan: LogicalPlan): LogicalPlan = { @@ -654,10 +639,8 @@ object PreAggregateUtil { /** * Below method will be used to get the select query when rollup policy is * applied in case of timeseries table - * @param tableSchema - * main data map schema - * @param selectedDataMapSchema - * selected data map schema for rollup + * @param tableSchema main data map schema + * @param selectedDataMapSchema selected data map schema for rollup * @return select query based on rolloup */ def createTimeseriesSelectQueryForRollup( @@ -695,10 +678,8 @@ object PreAggregateUtil { * Below method will be used to creating select query for timeseries * for lowest level for aggergation like second level, in that case it will * hit the maintable - * @param tableSchema - * data map schema - * @param parentTableName - * parent schema + * @param tableSchema data map schema + * @param parentTableName parent schema * @return select query for loading */ def createTimeSeriesSelectQueryFromMain(tableSchema: TableSchema, @@ -728,10 +709,8 @@ object PreAggregateUtil { /** * Below method will be used to select rollup table in case of * timeseries data map loading - * @param list - * list of timeseries datamap - * @param dataMapSchema - * datamap schema + * @param list list of timeseries datamap + * @param dataMapSchema datamap schema * @return select table name */ def getRollupDataMapNameForTimeSeries( @@ -748,4 +727,132 @@ object PreAggregateUtil { rollupDataMapSchema.lastOption } } + + /** + * Below method will be used to validate aggregate function and get the attribute information + * which is applied on select query. + * Currently sum, max, min, count, avg is supported + * in case of any other aggregate function it will return empty sequence + * In case of avg it will return two fields one for count + * and other of sum of that column to support rollup + * + * @param aggExp aggregate expression + * @return list of fields + */ + def validateAggregateFunctionAndGetFields(aggExp: AggregateExpression): + Seq[AggregateExpression] = { + aggExp.aggregateFunction match { + case Sum(MatchCastExpression(exp: Expression, changeDataType: DataType)) => + Seq(AggregateExpression(Sum(Cast( + exp, + changeDataType)), + aggExp.mode, + aggExp.isDistinct)) + case Sum(_: Expression) => + Seq(aggExp) + case Count(MatchCastExpression(exp: Seq[Expression], changeDataType: DataType)) => + Seq(AggregateExpression(Count(Cast( + exp, + changeDataType)), + aggExp.mode, + aggExp.isDistinct)) + case Count(_: Seq[Expression]) => + Seq(aggExp) + case Min(MatchCastExpression(exp: Expression, changeDataType: DataType)) => + Seq(AggregateExpression(Min(Cast( + exp, + changeDataType)), + aggExp.mode, + aggExp.isDistinct)) + case Min(exp: Expression) => + Seq(aggExp) + case Max(MatchCastExpression(exp: Expression, changeDataType: DataType)) => + Seq(AggregateExpression(Max(Cast( + exp, + changeDataType)), + aggExp.mode, + aggExp.isDistinct)) + case Max(exp: Expression) => + Seq(aggExp) + // in case of average need to return two columns + // sum and count of the column to added during table creation to support rollup + case Average(MatchCastExpression(exp: Expression, changeDataType: DataType)) => + Seq(AggregateExpression(Sum(Cast( + exp, + changeDataType)), + aggExp.mode, + aggExp.isDistinct), + AggregateExpression(Count(Cast( + exp, + changeDataType)), + aggExp.mode, + aggExp.isDistinct)) + // in case of average need to return two columns + // sum and count of the column to added during table creation to support rollup + case Average(exp: Expression) => + Seq(AggregateExpression(Sum(exp), + aggExp.mode, + aggExp.isDistinct), + AggregateExpression(Count(exp), + aggExp.mode, + aggExp.isDistinct)) + case _ => + Seq.empty + } + } + + /** + * Below method will be used to get the logical plan from aggregate expression + * @param aggExp aggregate expression + * @param tableName parent table name + * @param databaseName database name + * @param logicalRelation logical relation + * @return logical plan + */ + def getLogicalPlanFromAggExp(aggExp: AggregateExpression, + tableName: String, + databaseName: String, + logicalRelation: LogicalRelation, + sparkSession: SparkSession, + parser: CarbonSpark2SqlParser): LogicalPlan = { + // adding the preAGG UDF, so pre aggregate data loading rule and query rule will not + // be applied + val query = parser.addPreAggFunction(s"Select ${ aggExp.sql } from $databaseName.$tableName") + // updating the logical relation of logical plan to so when two logical plan + // will be compared it will not consider relation + updateLogicalRelation(sparkSession.sql(query).logicalPlan, logicalRelation) + } + + /** + * Below method will be used to update the logical plan of expression + * with parent table logical relation + * @param logicalPlan logial plan + * @param logicalRelation maintable logical relation + * @return updated plan + */ + def updateLogicalRelation(logicalPlan: LogicalPlan, + logicalRelation: LogicalRelation): LogicalPlan = { + logicalPlan transform { + case l: LogicalRelation => + l.copy(relation = logicalRelation.relation) + } + } + + /** + * Normalize the exprIds in the given expression, by updating the exprId in `AttributeReference` + * with its referenced ordinal from input attributes. It's similar to `BindReferences` but we + * do not use `BindReferences` here as the plan may take the expression as a parameter with type + * `Attribute`, and replace it with `BoundReference` will cause error. + */ + def normalizeExprId[T <: Expression](e: T, input: AttributeSeq): T = { + e.transformUp { + case ar: AttributeReference => + val ordinal = input.indexOf(ar.exprId) + if (ordinal == -1) { + ar + } else { + ar.withExprId(ExprId(ordinal)) + } + }.canonicalized.asInstanceOf[T] + } }
