Repository: carbondata
Updated Branches:
  refs/heads/branch-1.3 9f73f0e60 -> 1137c285f


[CARBONDATA-1763] Dropped table if exception thrown while creation

Preaggregate table is not getting dropped when creation fails because

Exceptions from undo metadata is not handled
If preaggregate table is not registered with main table(main table updation 
fails) then it is not dropped from metastore.

This closes #1951


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1137c285
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1137c285
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1137c285

Branch: refs/heads/branch-1.3
Commit: 1137c285f55dfdc0de24bdebf81d78187df93f8a
Parents: 9f73f0e
Author: kunal642 <kunalkapoor...@gmail.com>
Authored: Thu Feb 8 11:50:23 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Fri Feb 9 16:22:37 2018 +0530

----------------------------------------------------------------------
 .../spark/rdd/CarbonDataRDDFactory.scala        |  2 +-
 .../datamap/CarbonCreateDataMapCommand.scala    |  2 +-
 .../datamap/CarbonDropDataMapCommand.scala      | 29 ++++++++++++++++----
 .../CreatePreAggregateTableCommand.scala        |  3 +-
 4 files changed, 28 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/1137c285/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 5c43d58..8ed7623 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -546,7 +546,7 @@ object CarbonDataRDDFactory {
           LOGGER.error(ex, "Problem while committing data maps")
           false
       }
-      if (!done && !commitComplete) {
+      if (!done || !commitComplete) {
         CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, 
uniqueTableStatusId)
         LOGGER.info("********starting clean up**********")
         CarbonLoaderUtil.deleteSegment(carbonLoadModel, 
carbonLoadModel.getSegmentId.toInt)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1137c285/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index f2f001e..0fd5437 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -123,7 +123,7 @@ case class CarbonCreateDataMapCommand(
   override def undoMetadata(sparkSession: SparkSession, exception: Exception): 
Seq[Row] = {
     if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
       dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
-      if (!tableIsExists) {
+      if (!tableIsExists && createPreAggregateTableCommands != null) {
         createPreAggregateTableCommands.undoMetadata(sparkSession, exception)
       } else {
         Seq.empty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1137c285/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index bc55988..8ef394c 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -47,7 +47,8 @@ case class CarbonDropDataMapCommand(
     dataMapName: String,
     ifExistsSet: Boolean,
     databaseNameOp: Option[String],
-    tableName: String)
+    tableName: String,
+    forceDrop: Boolean = false)
   extends AtomicRunnableCommand {
 
   var commandToRun: CarbonDropTableCommand = _
@@ -74,6 +75,10 @@ case class CarbonDropDataMapCommand(
         case ex: NoSuchTableException =>
           throw ex
       }
+      // If datamap to be dropped in parent table then drop the datamap from 
metastore and remove
+      // entry from parent table.
+      // If force drop is true then remove the datamap from hivemetastore. No 
need to remove from
+      // parent as the first condition would have taken care of it.
       if (carbonTable.isDefined && 
carbonTable.get.getTableInfo.getDataMapSchemaList.size() > 0) {
         val dataMapSchema = 
carbonTable.get.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex.
           find(_._1.getDataMapName.equalsIgnoreCase(dataMapName))
@@ -85,7 +90,6 @@ case class CarbonDropDataMapCommand(
               ifExistsSet,
               sparkSession)
           OperationListenerBus.getInstance.fireEvent(dropDataMapPreEvent, 
operationContext)
-
           
carbonTable.get.getTableInfo.getDataMapSchemaList.remove(dataMapSchema.get._2)
           val schemaConverter = new ThriftWrapperSchemaConverterImpl
           PreAggregateUtil.updateSchemaInfo(
@@ -111,13 +115,28 @@ case class CarbonDropDataMapCommand(
         } else if (!ifExistsSet) {
           throw new NoSuchDataMapException(dataMapName, tableName)
         }
-      } else if ((carbonTable.isDefined &&
-        carbonTable.get.getTableInfo.getDataMapSchemaList.size() == 0)) {
+      } else if (forceDrop) {
+        val childCarbonTable: Option[CarbonTable] = try {
+          val childTableName = tableName + "_" + dataMapName
+          Some(CarbonEnv.getCarbonTable(databaseNameOp, 
childTableName)(sparkSession))
+        } catch {
+          case _: Exception =>
+            None
+        }
+        if (childCarbonTable.isDefined) {
+          commandToRun = CarbonDropTableCommand(
+            ifExistsSet = true,
+            Some(childCarbonTable.get.getDatabaseName),
+            childCarbonTable.get.getTableName,
+            dropChildTable = true)
+          commandToRun.processMetadata(sparkSession)
+        }
+      } else if (carbonTable.isDefined &&
+        carbonTable.get.getTableInfo.getDataMapSchemaList.size() == 0) {
         if (!ifExistsSet) {
           throw new NoSuchDataMapException(dataMapName, tableName)
         }
       }
-
     } catch {
       case e: NoSuchDataMapException =>
         throw e

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1137c285/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 17d6882..6017862 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -162,7 +162,8 @@ case class CreatePreAggregateTableCommand(
       dataMapName,
       ifExistsSet = true,
       parentTableIdentifier.database,
-      parentTableIdentifier.table).run(sparkSession)
+      parentTableIdentifier.table,
+      forceDrop = true).run(sparkSession)
     Seq.empty
   }
 

Reply via email to