[CARBONDATA-1953]Pre-aggregate Should inherit sort column,sort_scope,dictionary encoding
Pre-aggregate should inherit the main table properties , sort column order , sort scope ,table block size. This closes #1742 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2d6eb12f Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2d6eb12f Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2d6eb12f Branch: refs/heads/branch-1.3 Commit: 2d6eb12f54d4e2e2563c7c29730bd8a6db829522 Parents: c100251 Author: BJangir <[email protected]> Authored: Fri Dec 29 19:58:38 2017 +0530 Committer: kumarvishal <[email protected]> Committed: Wed Jan 3 15:45:19 2018 +0530 ---------------------------------------------------------------------- .../preaggregate/TestPreAggregateMisc.scala | 35 +++++++++++++++++++- .../command/carbonTableSchemaCommon.scala | 26 ++++++++------- .../CreatePreAggregateTableCommand.scala | 25 ++++++++++++-- 3 files changed, 70 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d6eb12f/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala index b716124..02314d7 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateMisc.scala @@ -16,10 +16,12 @@ */ package org.apache.carbondata.integration.spark.testsuite.preaggregate -import org.apache.spark.sql.Row +import org.apache.spark.sql.{CarbonEnv, Row} import org.apache.spark.sql.test.util.QueryTest import org.scalatest.{BeforeAndAfterAll, Ignore} +import org.apache.carbondata.core.constants.CarbonCommonConstants + @Ignore class TestPreAggregateMisc extends QueryTest with BeforeAndAfterAll { override def beforeAll: Unit = { @@ -38,6 +40,37 @@ class TestPreAggregateMisc extends QueryTest with BeforeAndAfterAll { sql("drop datamap agg1 on table mainTable") } + test("check preagg tbl properties sort columns inherit from main tbl") { + sql("drop table if exists y ") + sql( + "create table y(year int,month int,name string,salary int) stored by 'carbondata' " + + "tblproperties('NO_INVERTED_INDEX'='name','sort_scope'='Global_sort'," + + "'table_blocksize'='23','Dictionary_include'='month','Dictionary_exclude'='year,name'," + + "'sort_columns'='month,year,name')") + sql("insert into y select 10,11,'babu',12") + sql( + "create datamap y1_sum1 on table y using 'preaggregate' as select year,month,name,sum" + + "(salary) from y group by year,month,name") + + val carbonTable = CarbonEnv.getCarbonTable(Some("default"), "y")(sqlContext.sparkSession) + val datamaptable = CarbonEnv + .getCarbonTable(Some("default"), "y_y1_sum1")(sqlContext.sparkSession) + + val sortcolumns = datamaptable.getTableInfo.getFactTable.getTableProperties + .get(CarbonCommonConstants.SORT_COLUMNS) + val sortcolummatch = sortcolumns != null && sortcolumns.equals("y_month,y_year,y_name") + + val sortscope = datamaptable.getTableInfo.getFactTable.getTableProperties.get("sort_scope") + val sortscopematch = sortscope != null && sortscope.equals( + carbonTable.getTableInfo.getFactTable.getTableProperties.get("sort_scope")) + val blockSize = datamaptable.getTableInfo.getFactTable.getTableProperties + .get(CarbonCommonConstants.TABLE_BLOCKSIZE) + val blocksizematch = blockSize != null && + blockSize.equals(carbonTable.getTableInfo.getFactTable.getTableProperties. + get(CarbonCommonConstants.TABLE_BLOCKSIZE)) + assert(sortcolummatch && sortscopematch && blocksizematch) + } + override def afterAll: Unit = { sql("drop table if exists mainTable") http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d6eb12f/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index 1e368cf..b76cfcf 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -581,20 +581,22 @@ class TableNewProcessor(cm: TableModel) { updateColumnGroupsInFields(cm.columnGroups, allColumns) - // Setting the boolean value of useInvertedIndex in column schema - val noInvertedIndexCols = cm.noInvertedIdxCols.getOrElse(Seq()) - LOGGER.info("NoINVERTEDINDEX columns are : " + noInvertedIndexCols.mkString(",")) - for (column <- allColumns) { - // When the column is measure or the specified no inverted index column in DDL, - // set useInvertedIndex to false, otherwise true. - if (noInvertedIndexCols.contains(column.getColumnName) || - cm.msrCols.exists(_.column.equalsIgnoreCase(column.getColumnName))) { - column.setUseInvertedIndex(false) - } else { - column.setUseInvertedIndex(true) + // Setting the boolean value of useInvertedIndex in column schema, if Paranet table is defined + // Encoding is already decided above + if (!cm.parentTable.isDefined) { + val noInvertedIndexCols = cm.noInvertedIdxCols.getOrElse(Seq()) + LOGGER.info("NoINVERTEDINDEX columns are : " + noInvertedIndexCols.mkString(",")) + for (column <- allColumns) { + // When the column is measure or the specified no inverted index column in DDL, + // set useInvertedIndex to false, otherwise true. + if (noInvertedIndexCols.contains(column.getColumnName) || + cm.msrCols.exists(_.column.equalsIgnoreCase(column.getColumnName))) { + column.setUseInvertedIndex(false) + } else { + column.setUseInvertedIndex(true) + } } } - // Adding dummy measure if no measure is provided if (measureCount == 0) { val encoders = new java.util.ArrayList[Encoding]() http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d6eb12f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index c532888..3e86233 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.command.preaaggregate +import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.sql._ @@ -56,6 +57,26 @@ case class CreatePreAggregateTableCommand( val tableProperties = mutable.Map[String, String]() dmProperties.foreach(t => tableProperties.put(t._1, t._2)) + val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan) + assert(parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table), + "Parent table name is different in select and create") + + + var neworder = Seq[String]() + val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala + parentOrder.foreach(parentcol => + fields.filter(col => (fieldRelationMap.get(col).get.aggregateFunction.isEmpty) && + (parentcol.equals(fieldRelationMap.get(col).get. + columnTableRelationList.get(0).parentColumnName))) + .map(cols => neworder :+= cols.column) + ) + tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, neworder.mkString(",")) + tableProperties.put("sort_scope", parentTable.getTableInfo.getFactTable. + getTableProperties.getOrDefault("sort_scope", CarbonCommonConstants + .LOAD_SORT_SCOPE_DEFAULT)) + tableProperties + .put(CarbonCommonConstants.TABLE_BLOCKSIZE, parentTable.getBlockSizeInMB.toString) + // prepare table model of the collected tokens val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel( ifNotExistPresent = false, @@ -68,9 +89,7 @@ case class CreatePreAggregateTableCommand( isAlterFlow = false, None) - val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan) - assert(parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table), - "Parent table name is different in select and create") + // updating the relation identifier, this will be stored in child table // which can be used during dropping of pre-aggreate table as parent table will // also get updated
