Repository: carbondata Updated Branches: refs/heads/master 0f4ced9fd -> dde0873f7
[CARBONDATA-2333] Add validation for insert overwrite on partition table when datamap is present If any of the datamap is not partitioned on all the partition columns then dont allow insert/load overwrite on the parent table. This closes #2172 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/dde0873f Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/dde0873f Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/dde0873f Branch: refs/heads/master Commit: dde0873f7d5f95b522b7dedb43268e2d7253c21d Parents: 0f4ced9 Author: kunal642 <[email protected]> Authored: Wed Apr 11 16:52:08 2018 +0530 Committer: kumarvishal09 <[email protected]> Committed: Mon Apr 30 19:34:24 2018 +0530 ---------------------------------------------------------------------- ...ndardPartitionWithPreaggregateTestCase.scala | 31 ++++++++++ .../carbondata/spark/util/CarbonScalaUtil.scala | 15 ++++- .../org/apache/spark/util/PartitionUtils.scala | 59 +++++++++++++++++++- .../preaaggregate/PreAggregateListeners.scala | 30 +++++++++- .../preaaggregate/PreAggregateTableHelper.scala | 16 ++---- 5 files changed, 132 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/dde0873f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala index ce92bab..8a3ae3f 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala @@ -543,6 +543,37 @@ class StandardPartitionWithPreaggregateTestCase extends QueryTest with BeforeAnd checkAnswer(sql("select sum(hs_len) from updatetime_8 group by imex"),Seq(Row(40),Row(42),Row(83))) } + test("check partitioning for child tables with various combinations") { + sql("drop table if exists partitionone") + sql( + """ + | CREATE TABLE if not exists partitionone (empname String, id int) + | PARTITIONED BY (year int, month int,day int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql( + "create datamap p7 on table partitionone using 'preaggregate' as select empname, sum(year), sum(day) from partitionone group by empname, year, day") + sql( + "create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname") + sql( + "create datamap p2 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year") + sql( + "create datamap p3 on table partitionone using 'preaggregate' as select empname, sum(year), sum(month) from partitionone group by empname, year, month") + sql( + "create datamap p4 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month, day") + sql( + "create datamap p5 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, month") + sql( + "create datamap p6 on table partitionone using 'preaggregate' as select empname, sum(year), sum(month) from partitionone group by empname, month, day") + assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"),"partitionone_p1")(sqlContext.sparkSession).isHivePartitionTable) + assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"),"partitionone_p2")(sqlContext.sparkSession).getPartitionInfo.getColumnSchemaList.size() == 1) + assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"),"partitionone_p3")(sqlContext.sparkSession).getPartitionInfo.getColumnSchemaList.size == 2) + assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"),"partitionone_p4")(sqlContext.sparkSession).getPartitionInfo.getColumnSchemaList.size == 3) + assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"),"partitionone_p5")(sqlContext.sparkSession).isHivePartitionTable) + assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"),"partitionone_p6")(sqlContext.sparkSession).isHivePartitionTable) + assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"),"partitionone_p7")(sqlContext.sparkSession).isHivePartitionTable) + } + def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit = { var isValidPlan = false plan.transform { http://git-wip-us.apache.org/repos/asf/carbondata/blob/dde0873f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala index 37cdc41..b851599 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala @@ -18,8 +18,8 @@ package org.apache.carbondata.spark.util import java.{lang, util} +import java.io.IOException import java.lang.ref.Reference -import java.nio.charset.Charset import java.text.SimpleDateFormat import java.util.Date @@ -34,6 +34,7 @@ import org.apache.spark.sql.execution.command.{DataTypeInfo, UpdateTableModel} import org.apache.spark.sql.types._ import org.apache.spark.util.CarbonReflectionUtils +import org.apache.carbondata.common.exceptions.MetadataProcessException import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogService import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType} @@ -496,8 +497,16 @@ object CarbonScalaUtil { if (ex != null) { ex match { case sparkException: SparkException => - if (sparkException.getCause.isInstanceOf[DataLoadingException] || - sparkException.getCause.isInstanceOf[CarbonDataLoadingException]) { + if (sparkException.getCause.isInstanceOf[IOException]) { + if (sparkException.getCause.getCause.isInstanceOf[MetadataProcessException]) { + executorMessage = sparkException.getCause.getCause.getMessage + errorMessage = errorMessage + ": " + executorMessage + } else { + executorMessage = sparkException.getCause.getMessage + errorMessage = errorMessage + ": " + executorMessage + } + } else if (sparkException.getCause.isInstanceOf[DataLoadingException] || + sparkException.getCause.isInstanceOf[CarbonDataLoadingException]) { executorMessage = sparkException.getCause.getMessage errorMessage = errorMessage + ": " + executorMessage } else if (sparkException.getCause.isInstanceOf[TextParsingException]) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/dde0873f/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala index 15e82fe..84d9c47 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala @@ -21,12 +21,13 @@ import java.text.SimpleDateFormat import java.util import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ListBuffer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job -import org.apache.spark.sql.execution.command.AlterPartitionModel +import org.apache.spark.sql.execution.command.{AlterPartitionModel, DataMapField, Field, PartitionerField} import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.block.{SegmentProperties, TableBlockInfo} @@ -226,4 +227,60 @@ object PartitionUtils { } } } + + /** + * Used to extract PartitionerFields for aggregation datamaps. + * This method will keep generating partitionerFields until the sequence of + * partition column is broken. + * + * For example: if x,y,z are partition columns in main table then child tables will be + * partitioned only if the child table has List("x,y,z", "x,y", "x") as the projection columns. + * + * + */ + def getPartitionerFields(allPartitionColumn: Seq[String], + fieldRelations: mutable.LinkedHashMap[Field, DataMapField]): Seq[PartitionerField] = { + + def generatePartitionerField(partitionColumn: List[String], + partitionerFields: Seq[PartitionerField]): Seq[PartitionerField] = { + partitionColumn match { + case head :: tail => + // Collect the first relation which matched the condition + val validRelation = fieldRelations.zipWithIndex.collectFirst { + case ((field, dataMapField), index) if + dataMapField.columnTableRelationList.getOrElse(Seq()).nonEmpty && + head.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) && + dataMapField.aggregateFunction.isEmpty => + (PartitionerField(field.name.get, + field.dataType, + field.columnComment), allPartitionColumn.indexOf(head)) + } + if (validRelation.isDefined) { + val (partitionerField, index) = validRelation.get + // if relation is found then check if the partitionerFields already found are equal + // to the index of this element. + // If x with index 1 is found then there should be exactly 1 element already found. + // If z with index 2 comes directly after x then this check will be false are 1 + // element is skipped in between and index would be 2 and number of elements found + // would be 1. In that case return empty sequence so that the aggregate table is not + // partitioned on any column. + if (index == partitionerFields.length) { + generatePartitionerField(tail, partitionerFields :+ partitionerField) + } else { + Seq.empty + } + } else { + // if not found then countinue search for the rest of the elements. Because the rest + // of the elements can also decide if the table has to be partitioned or not. + generatePartitionerField(tail, partitionerFields) + } + case Nil => + // if end of list then return fields. + partitionerFields + } + } + + generatePartitionerField(allPartitionColumn.toList, Seq.empty) + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/dde0873f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala index cb1c11b..5e11884 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala @@ -1,4 +1,4 @@ -/* + /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -428,6 +428,32 @@ object LoadPostAggregateListener extends OperationEventListener { val carbonLoadModel = carbonLoadModelOption.get val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable if (CarbonUtil.hasAggregationDataMap(table)) { + val isOverwrite = + operationContext.getProperty("isOverwrite").asInstanceOf[Boolean] + if (isOverwrite && table.isHivePartitionTable) { + val parentPartitionColumns = table.getPartitionInfo.getColumnSchemaList.asScala + .map(_.getColumnName) + val childTablesWithoutPartitionColumns = + table.getTableInfo.getDataMapSchemaList.asScala.filter { dataMapSchema => + val childColumns = dataMapSchema.getChildSchema.getListOfColumns.asScala + val partitionColExists = parentPartitionColumns.forall { + partition => + childColumns.exists { childColumn => + childColumn.getAggFunction.isEmpty && + childColumn.getParentColumnTableRelations.asScala.head.getColumnName. + equals(partition) + } + } + !partitionColExists + } + if (childTablesWithoutPartitionColumns.nonEmpty) { + throw new MetadataProcessException( + "Cannot execute load overwrite or insert overwrite as the following aggregate tables" + + s" ${ + childTablesWithoutPartitionColumns.toList.map(_.getChildSchema.getTableName) + } are not partitioned on all the partition column. Drop these to continue") + } + } // getting all the aggergate datamap schema val aggregationDataMapList = table.getTableInfo.getDataMapSchemaList.asScala .filter(_.isInstanceOf[AggregationDataMapSchema]) @@ -440,8 +466,6 @@ object LoadPostAggregateListener extends OperationEventListener { .asInstanceOf[CarbonLoadDataCommand] childLoadCommand.dataFrame = Some(PreAggregateUtil .getDataFrame(sparkSession, childLoadCommand.logicalPlan.get)) - val isOverwrite = - operationContext.getProperty("isOverwrite").asInstanceOf[Boolean] childLoadCommand.operationContext = operationContext val timeseriesParent = childLoadCommand.internalOptions.get("timeseriesParent") val (parentTableIdentifier, segmentToLoad) = http://git-wip-us.apache.org/repos/asf/carbondata/blob/dde0873f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala index 2862d96..cef6cb8 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.parser.CarbonSpark2SqlParser +import org.apache.spark.util.PartitionUtils import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException import org.apache.carbondata.common.logging.LogServiceFactory @@ -76,18 +77,9 @@ case class PreAggregateTableHelper( Seq() } // Generate child table partition columns in the same order as the parent table. - val partitionerFields = fieldRelationMap.collect { - case (field, dataMapField) if parentPartitionColumns - .exists(parentCol => - /* For count(*) while Pre-Aggregate table creation,columnTableRelationList was null */ - dataMapField.columnTableRelationList.getOrElse(Seq()).nonEmpty && - parentCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) && - dataMapField.aggregateFunction.isEmpty) => - (PartitionerField(field.name.get, - field.dataType, - field.columnComment), parentPartitionColumns - .indexOf(dataMapField.columnTableRelationList.get.head.parentColumnName)) - }.toSeq.sortBy(_._2).map(_._1) + val partitionerFields = + PartitionUtils.getPartitionerFields(parentPartitionColumns, fieldRelationMap) + dmProperties.foreach(t => tableProperties.put(t._1, t._2)) val selectTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
