Repository: carbondata
Updated Branches:
  refs/heads/master 0f4ced9fd -> dde0873f7


[CARBONDATA-2333] Add validation for insert overwrite on partition table when 
datamap is present

If any of the datamap is not partitioned on all the partition columns then dont 
allow insert/load overwrite on the parent table.

This closes #2172


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/dde0873f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/dde0873f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/dde0873f

Branch: refs/heads/master
Commit: dde0873f7d5f95b522b7dedb43268e2d7253c21d
Parents: 0f4ced9
Author: kunal642 <[email protected]>
Authored: Wed Apr 11 16:52:08 2018 +0530
Committer: kumarvishal09 <[email protected]>
Committed: Mon Apr 30 19:34:24 2018 +0530

----------------------------------------------------------------------
 ...ndardPartitionWithPreaggregateTestCase.scala | 31 ++++++++++
 .../carbondata/spark/util/CarbonScalaUtil.scala | 15 ++++-
 .../org/apache/spark/util/PartitionUtils.scala  | 59 +++++++++++++++++++-
 .../preaaggregate/PreAggregateListeners.scala   | 30 +++++++++-
 .../preaaggregate/PreAggregateTableHelper.scala | 16 ++----
 5 files changed, 132 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/dde0873f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
index ce92bab..8a3ae3f 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
@@ -543,6 +543,37 @@ class StandardPartitionWithPreaggregateTestCase extends 
QueryTest with BeforeAnd
     checkAnswer(sql("select sum(hs_len) from updatetime_8 group by 
imex"),Seq(Row(40),Row(42),Row(83)))
   }
 
+  test("check partitioning for child tables with various combinations") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String, id int)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(
+      "create datamap p7 on table partitionone using 'preaggregate' as select 
empname, sum(year), sum(day) from partitionone group by empname, year, day")
+    sql(
+      "create datamap p1 on table partitionone using 'preaggregate' as select 
empname, sum(year) from partitionone group by empname")
+    sql(
+      "create datamap p2 on table partitionone using 'preaggregate' as select 
empname, sum(year) from partitionone group by empname, year")
+    sql(
+      "create datamap p3 on table partitionone using 'preaggregate' as select 
empname, sum(year), sum(month) from partitionone group by empname, year, month")
+    sql(
+      "create datamap p4 on table partitionone using 'preaggregate' as select 
empname, sum(year) from partitionone group by empname, year, month, day")
+    sql(
+      "create datamap p5 on table partitionone using 'preaggregate' as select 
empname, sum(year) from partitionone group by empname, month")
+    sql(
+      "create datamap p6 on table partitionone using 'preaggregate' as select 
empname, sum(year), sum(month) from partitionone group by empname, month, day")
+    
assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"),"partitionone_p1")(sqlContext.sparkSession).isHivePartitionTable)
+    
assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"),"partitionone_p2")(sqlContext.sparkSession).getPartitionInfo.getColumnSchemaList.size()
 == 1)
+    
assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"),"partitionone_p3")(sqlContext.sparkSession).getPartitionInfo.getColumnSchemaList.size
 == 2)
+    
assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"),"partitionone_p4")(sqlContext.sparkSession).getPartitionInfo.getColumnSchemaList.size
 == 3)
+    
assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"),"partitionone_p5")(sqlContext.sparkSession).isHivePartitionTable)
+    
assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"),"partitionone_p6")(sqlContext.sparkSession).isHivePartitionTable)
+    
assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"),"partitionone_p7")(sqlContext.sparkSession).isHivePartitionTable)
+  }
+
   def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit 
= {
     var isValidPlan = false
     plan.transform {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dde0873f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 37cdc41..b851599 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -18,8 +18,8 @@
 package org.apache.carbondata.spark.util
 
 import java.{lang, util}
+import java.io.IOException
 import java.lang.ref.Reference
-import java.nio.charset.Charset
 import java.text.SimpleDateFormat
 import java.util.Date
 
@@ -34,6 +34,7 @@ import org.apache.spark.sql.execution.command.{DataTypeInfo, 
UpdateTableModel}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.CarbonReflectionUtils
 
+import org.apache.carbondata.common.exceptions.MetadataProcessException
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogService
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
@@ -496,8 +497,16 @@ object CarbonScalaUtil {
     if (ex != null) {
       ex match {
         case sparkException: SparkException =>
-          if (sparkException.getCause.isInstanceOf[DataLoadingException] ||
-              
sparkException.getCause.isInstanceOf[CarbonDataLoadingException]) {
+          if (sparkException.getCause.isInstanceOf[IOException]) {
+            if 
(sparkException.getCause.getCause.isInstanceOf[MetadataProcessException]) {
+              executorMessage = sparkException.getCause.getCause.getMessage
+              errorMessage = errorMessage + ": " + executorMessage
+            } else {
+              executorMessage = sparkException.getCause.getMessage
+              errorMessage = errorMessage + ": " + executorMessage
+            }
+          } else if 
(sparkException.getCause.isInstanceOf[DataLoadingException] ||
+                     
sparkException.getCause.isInstanceOf[CarbonDataLoadingException]) {
             executorMessage = sparkException.getCause.getMessage
             errorMessage = errorMessage + ": " + executorMessage
           } else if 
(sparkException.getCause.isInstanceOf[TextParsingException]) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dde0873f/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index 15e82fe..84d9c47 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -21,12 +21,13 @@ import java.text.SimpleDateFormat
 import java.util
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.sql.execution.command.AlterPartitionModel
+import org.apache.spark.sql.execution.command.{AlterPartitionModel, 
DataMapField, Field, PartitionerField}
 
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.block.{SegmentProperties, 
TableBlockInfo}
@@ -226,4 +227,60 @@ object PartitionUtils {
       }
     }
   }
+
+  /**
+   * Used to extract PartitionerFields for aggregation datamaps.
+   * This method will keep generating partitionerFields until the sequence of
+   * partition column is broken.
+   *
+   * For example: if x,y,z are partition columns in main table then child 
tables will be
+   * partitioned only if the child table has List("x,y,z", "x,y", "x") as the 
projection columns.
+   *
+   *
+   */
+  def getPartitionerFields(allPartitionColumn: Seq[String],
+      fieldRelations: mutable.LinkedHashMap[Field, DataMapField]): 
Seq[PartitionerField] = {
+
+    def generatePartitionerField(partitionColumn: List[String],
+        partitionerFields: Seq[PartitionerField]): Seq[PartitionerField] = {
+      partitionColumn match {
+        case head :: tail =>
+          // Collect the first relation which matched the condition
+          val validRelation = fieldRelations.zipWithIndex.collectFirst {
+            case ((field, dataMapField), index) if
+            dataMapField.columnTableRelationList.getOrElse(Seq()).nonEmpty &&
+            
head.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) &&
+            dataMapField.aggregateFunction.isEmpty =>
+              (PartitionerField(field.name.get,
+                field.dataType,
+                field.columnComment), allPartitionColumn.indexOf(head))
+          }
+          if (validRelation.isDefined) {
+            val (partitionerField, index) = validRelation.get
+            // if relation is found then check if the partitionerFields 
already found are equal
+            // to the index of this element.
+            // If x with index 1 is found then there should be exactly 1 
element already found.
+            // If z with index 2 comes directly after x then this check will 
be false are 1
+            // element is skipped in between and index would be 2 and number 
of elements found
+            // would be 1. In that case return empty sequence so that the 
aggregate table is not
+            // partitioned on any column.
+            if (index == partitionerFields.length) {
+              generatePartitionerField(tail, partitionerFields :+ 
partitionerField)
+            } else {
+              Seq.empty
+            }
+          } else {
+            // if not found then countinue search for the rest of the 
elements. Because the rest
+            // of the elements can also decide if the table has to be 
partitioned or not.
+            generatePartitionerField(tail, partitionerFields)
+          }
+        case Nil =>
+          // if end of list then return fields.
+          partitionerFields
+      }
+    }
+
+    generatePartitionerField(allPartitionColumn.toList, Seq.empty)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dde0873f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index cb1c11b..5e11884 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -1,4 +1,4 @@
-/*
+  /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -428,6 +428,32 @@ object LoadPostAggregateListener extends 
OperationEventListener {
       val carbonLoadModel = carbonLoadModelOption.get
       val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
       if (CarbonUtil.hasAggregationDataMap(table)) {
+        val isOverwrite =
+          operationContext.getProperty("isOverwrite").asInstanceOf[Boolean]
+        if (isOverwrite && table.isHivePartitionTable) {
+          val parentPartitionColumns = 
table.getPartitionInfo.getColumnSchemaList.asScala
+            .map(_.getColumnName)
+          val childTablesWithoutPartitionColumns =
+            table.getTableInfo.getDataMapSchemaList.asScala.filter { 
dataMapSchema =>
+              val childColumns = 
dataMapSchema.getChildSchema.getListOfColumns.asScala
+              val partitionColExists = parentPartitionColumns.forall {
+                partition =>
+                  childColumns.exists { childColumn =>
+                    childColumn.getAggFunction.isEmpty &&
+                    
childColumn.getParentColumnTableRelations.asScala.head.getColumnName.
+                      equals(partition)
+                  }
+              }
+              !partitionColExists
+            }
+          if (childTablesWithoutPartitionColumns.nonEmpty) {
+            throw new MetadataProcessException(
+              "Cannot execute load overwrite or insert overwrite as the 
following aggregate tables"
+              + s" ${
+                
childTablesWithoutPartitionColumns.toList.map(_.getChildSchema.getTableName)
+              } are not partitioned on all the partition column. Drop these to 
continue")
+          }
+        }
         // getting all the aggergate datamap schema
         val aggregationDataMapList = 
table.getTableInfo.getDataMapSchemaList.asScala
           .filter(_.isInstanceOf[AggregationDataMapSchema])
@@ -440,8 +466,6 @@ object LoadPostAggregateListener extends 
OperationEventListener {
             .asInstanceOf[CarbonLoadDataCommand]
           childLoadCommand.dataFrame = Some(PreAggregateUtil
             .getDataFrame(sparkSession, childLoadCommand.logicalPlan.get))
-          val isOverwrite =
-            operationContext.getProperty("isOverwrite").asInstanceOf[Boolean]
           childLoadCommand.operationContext = operationContext
           val timeseriesParent = 
childLoadCommand.internalOptions.get("timeseriesParent")
           val (parentTableIdentifier, segmentToLoad) =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dde0873f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
index 2862d96..cef6cb8 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
@@ -28,6 +28,7 @@ import 
org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.util.PartitionUtils
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -76,18 +77,9 @@ case class PreAggregateTableHelper(
       Seq()
     }
     // Generate child table partition columns in the same order as the parent 
table.
-    val partitionerFields = fieldRelationMap.collect {
-      case (field, dataMapField) if parentPartitionColumns
-        .exists(parentCol =>
-          /* For count(*) while Pre-Aggregate table 
creation,columnTableRelationList was null */
-          dataMapField.columnTableRelationList.getOrElse(Seq()).nonEmpty &&
-            
parentCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName)
 &&
-          dataMapField.aggregateFunction.isEmpty) =>
-        (PartitionerField(field.name.get,
-          field.dataType,
-          field.columnComment), parentPartitionColumns
-          
.indexOf(dataMapField.columnTableRelationList.get.head.parentColumnName))
-    }.toSeq.sortBy(_._2).map(_._1)
+    val partitionerFields =
+      PartitionUtils.getPartitionerFields(parentPartitionColumns, 
fieldRelationMap)
+
     dmProperties.foreach(t => tableProperties.put(t._1, t._2))
 
     val selectTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)

Reply via email to