added AlterTableAddColumnRDD to AlterTableCommands

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/36112fa4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/36112fa4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/36112fa4

Branch: refs/heads/branch-1.1
Commit: 36112fa46d2a15e6403af97a7d8677231bedc8d0
Parents: f27b491
Author: kunal642 <kunal.kap...@knoldus.in>
Authored: Tue Apr 11 14:15:10 2017 +0530
Committer: Venkata Ramana G <ramana.gollam...@huawei.com>
Committed: Fri Apr 14 16:16:52 2017 +0530

----------------------------------------------------------------------
 .../spark/rdd/AlterTableAddColumnRDD.scala      |  6 +-
 .../execution/command/carbonTableSchema.scala   |  8 +--
 .../execution/command/AlterTableCommands.scala  | 62 ++++++++++++--------
 .../org/apache/spark/util/AlterTableUtil.scala  | 16 +++--
 .../restructure/AlterTableRevertTestCase.scala  | 19 +++++-
 5 files changed, 68 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/36112fa4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
index bb65b0b..ab1fd9c 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
@@ -19,7 +19,6 @@ package org.apache.carbondata.spark.rdd
 
 import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.AlterTableAddColumnsModel
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -49,13 +48,12 @@ class AddColumnPartition(rddId: Int, idx: Int, schema: 
ColumnSchema) extends Par
  */
 class AlterTableAddColumnRDD[K, V](sc: SparkContext,
     @transient newColumns: Seq[ColumnSchema],
-    alterTableModel: AlterTableAddColumnsModel,
     carbonTableIdentifier: CarbonTableIdentifier,
     carbonStorePath: String) extends RDD[(Int, String)](sc, Nil) {
 
   override def getPartitions: Array[Partition] = {
     newColumns.zipWithIndex.map { column =>
-      new DropColumnPartition(id, column._2, column._1)
+      new AddColumnPartition(id, column._2, column._1)
     }.toArray
   }
 
@@ -65,7 +63,7 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext,
     val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
     val iter = new Iterator[(Int, String)] {
       try {
-        val columnSchema = split.asInstanceOf[DropColumnPartition].columnSchema
+        val columnSchema = split.asInstanceOf[AddColumnPartition].columnSchema
         // create dictionary file if it is a dictionary column
         if (columnSchema.hasEncoding(Encoding.DICTIONARY) &&
             !columnSchema.hasEncoding(Encoding.DIRECT_DICTIONARY)) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/36112fa4/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 117b365..5108df8 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -152,7 +152,7 @@ case class AlterTableDropColumnModel(databaseName: 
Option[String],
     tableName: String,
     columns: List[String])
 
-class AlterTableProcessor(
+class AlterTableColumnSchemaGenerator(
     alterTableModel: AlterTableAddColumnsModel,
     dbName: String,
     tableInfo: TableInfo,
@@ -253,12 +253,6 @@ class AlterTableProcessor(
         }
       }
     }
-    // generate dictionary files for the newly added columns
-    new AlterTableAddColumnRDD(sc,
-      newCols,
-      alterTableModel,
-      tableIdentifier,
-      storePath).collect()
     tableSchema.setListOfColumns(allColumns.asJava)
     tableInfo.setLastUpdatedTime(System.currentTimeMillis())
     tableInfo.setFactTable(tableSchema)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/36112fa4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
index e380217..8b194da 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
@@ -28,17 +28,15 @@ import org.apache.spark.util.AlterTableUtil
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{CarbonMetadata, 
CarbonTableIdentifier}
 import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, 
TableInfo}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD
+import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, 
AlterTableDropColumnRDD}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, 
DataTypeConverterUtil}
 
 private[sql] case class AlterTableAddColumns(
@@ -52,13 +50,15 @@ private[sql] case class AlterTableAddColumns(
       .getOrElse(sparkSession.catalog.currentDatabase)
     LOGGER.audit(s"Alter table add columns request has been received for 
$dbName.$tableName")
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, 
LockUsage.COMPACTION_LOCK)
-    val locks = AlterTableUtil
-      .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, 
LOGGER)(sparkSession)
-    // get the latest carbon table and check for column existence
-    val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + 
tableName)
+    var locks = List.empty[ICarbonLock]
+    var lastUpdatedTime = 0L
     var newCols = 
Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
-    val lastUpdatedTime = carbonTable.getTableLastUpdatedTime
+    val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + 
tableName)
     try {
+      locks = AlterTableUtil
+        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, 
LOGGER)(sparkSession)
+      // get the latest carbon table and check for column existence
+      lastUpdatedTime = carbonTable.getTableLastUpdatedTime
       // read the latest schema file
       val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
         carbonTable.getCarbonTableIdentifier)
@@ -71,12 +71,17 @@ private[sql] case class AlterTableAddColumns(
           dbName,
           tableName,
           carbonTable.getStorePath)
-      newCols = new AlterTableProcessor(alterTableAddColumnsModel,
+      newCols = new AlterTableColumnSchemaGenerator(alterTableAddColumnsModel,
         dbName,
         wrapperTableInfo,
         carbonTablePath,
         carbonTable.getCarbonTableIdentifier,
         carbonTable.getStorePath, sparkSession.sparkContext).process
+      // generate dictionary files for the newly added columns
+      new AlterTableAddColumnRDD(sparkSession.sparkContext,
+        newCols,
+        carbonTable.getCarbonTableIdentifier,
+        carbonTable.getStorePath).collect()
       val schemaEvolutionEntry = new 
org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
       schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis)
       schemaEvolutionEntry.setAdded(newCols.toList.asJava)
@@ -92,7 +97,7 @@ private[sql] case class AlterTableAddColumns(
     } catch {
       case e: Exception => LOGGER
         .error("Alter table add columns failed :" + e.getMessage)
-        if (!newCols.isEmpty) {
+        if (newCols.nonEmpty) {
           LOGGER.info("Cleaning up the dictionary files as alter table add 
operation failed")
           new AlterTableDropColumnRDD(sparkSession.sparkContext,
             newCols,
@@ -100,7 +105,7 @@ private[sql] case class AlterTableAddColumns(
             carbonTable.getStorePath).collect()
         }
         AlterTableUtil.revertAddColumnChanges(dbName, tableName, 
lastUpdatedTime)(sparkSession)
-        sys.error("Alter table add column operation failed. Please check the 
logs")
+        sys.error(s"Alter table add operation failed: ${e.getMessage}")
     } finally {
       // release lock after command execution completion
       AlterTableUtil.releaseLocks(locks, LOGGER)
@@ -147,12 +152,14 @@ private[sql] case class 
AlterTableRenameTable(alterTableRenameModel: AlterTableR
       LockUsage.DELETE_SEGMENT_LOCK,
       LockUsage.CLEAN_FILES_LOCK,
       LockUsage.DROP_TABLE_LOCK)
-    val locks = AlterTableUtil
-      .validateTableAndAcquireLock(oldDatabaseName, oldTableName, 
locksToBeAcquired, LOGGER)(
-        sparkSession)
+    var locks = List.empty[ICarbonLock]
+    var lastUpdatedTime = 0L
     val carbonTable = relation.tableMeta.carbonTable
-    val lastUpdatedTime = carbonTable.getTableLastUpdatedTime
     try {
+      locks = AlterTableUtil
+        .validateTableAndAcquireLock(oldDatabaseName, oldTableName, 
locksToBeAcquired, LOGGER)(
+          sparkSession)
+      lastUpdatedTime = carbonTable.getTableLastUpdatedTime
       // get the latest carbon table and check for column existence
       val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
         carbonTable.getCarbonTableIdentifier)
@@ -197,7 +204,7 @@ private[sql] case class 
AlterTableRenameTable(alterTableRenameModel: AlterTableR
         AlterTableUtil.revertRenameTableChanges(oldTableIdentifier, 
newTableName, lastUpdatedTime)(
             sparkSession)
         renameBadRecords(newTableName, oldTableName, oldDatabaseName)
-        sys.error("Alter table rename table operation failed. Please check the 
logs")
+        sys.error(s"Alter table rename table operation failed: 
${e.getMessage}")
     } finally {
       // release lock after command execution completion
       AlterTableUtil.releaseLocks(locks, LOGGER)
@@ -242,13 +249,15 @@ private[sql] case class AlterTableDropColumns(
     val dbName = alterTableDropColumnModel.databaseName
       .getOrElse(sparkSession.catalog.currentDatabase)
     LOGGER.audit(s"Alter table drop columns request has been received for 
$dbName.$tableName")
+    var locks = List.empty[ICarbonLock]
+    var lastUpdatedTime = 0L
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, 
LockUsage.COMPACTION_LOCK)
-    val locks = AlterTableUtil
-      .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, 
LOGGER)(sparkSession)
     // get the latest carbon table and check for column existence
     val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + 
tableName)
-    val lastUpdatedTime = carbonTable.getTableLastUpdatedTime
     try {
+      locks = AlterTableUtil
+        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, 
LOGGER)(sparkSession)
+      lastUpdatedTime = carbonTable.getTableLastUpdatedTime
       // check each column existence in the table
       val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
       var dictionaryColumns = 
Seq[org.apache.carbondata.core.metadata.schema.table.column
@@ -318,7 +327,7 @@ private[sql] case class AlterTableDropColumns(
       case e: Exception => LOGGER
         .error("Alter table drop columns failed : " + e.getMessage)
         AlterTableUtil.revertDropColumnChanges(dbName, tableName, 
lastUpdatedTime)(sparkSession)
-        sys.error("Alter table drop column operation failed. Please check the 
logs")
+        sys.error(s"Alter table drop column operation failed: ${e.getMessage}")
     } finally {
       // release lock after command execution completion
       AlterTableUtil.releaseLocks(locks, LOGGER)
@@ -338,15 +347,16 @@ private[sql] case class AlterTableDataTypeChange(
       .getOrElse(sparkSession.catalog.currentDatabase)
     LOGGER.audit(s"Alter table change data type request has been received for 
$dbName.$tableName")
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, 
LockUsage.COMPACTION_LOCK)
-    val locks = AlterTableUtil
-      .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, 
LOGGER)(sparkSession)
+    var locks = List.empty[ICarbonLock]
     // get the latest carbon table and check for column existence
     val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + 
tableName)
-    val lastUpdatedTime = carbonTable.getTableLastUpdatedTime
+    var lastUpdatedTime = 0L
     try {
+      locks = AlterTableUtil
+        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, 
LOGGER)(sparkSession)
+      lastUpdatedTime = carbonTable.getTableLastUpdatedTime
       val columnName = alterTableDataTypeChangeModel.columnName
       val carbonColumns = 
carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
-
       if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
         LOGGER.audit(s"Alter table change data type request has failed. " +
                      s"Column $columnName does not exist")
@@ -397,7 +407,7 @@ private[sql] case class AlterTableDataTypeChange(
       case e: Exception => LOGGER
         .error("Alter table change datatype failed : " + e.getMessage)
         AlterTableUtil.revertDataTypeChanges(dbName, tableName, 
lastUpdatedTime)(sparkSession)
-        sys.error("Alter table data type change operation failed. Please check 
the logs")
+        sys.error(s"Alter table data type change operation failed: 
${e.getMessage}")
     } finally {
       // release lock after command execution completion
       AlterTableUtil.releaseLocks(locks, LOGGER)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/36112fa4/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala 
b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 5057d75..f5248f5 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -64,11 +64,17 @@ object AlterTableUtil {
     }
     // acquire the lock first
     val table = relation.tableMeta.carbonTable
-    var acquiredLocks = ListBuffer[ICarbonLock]()
-    locksToBeAcquired.foreach { lock =>
-      acquiredLocks += getLockObject(table, lock, LOGGER)
+    val acquiredLocks = ListBuffer[ICarbonLock]()
+    try {
+      locksToBeAcquired.foreach { lock =>
+        acquiredLocks += getLockObject(table, lock, LOGGER)
+      }
+      acquiredLocks.toList
+    } catch {
+      case e: Exception =>
+        releaseLocks(acquiredLocks.toList, LOGGER)
+        throw e
     }
-    acquiredLocks.toList
   }
 
   /**
@@ -249,7 +255,7 @@ object AlterTableUtil {
     val evolutionEntryList = 
thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 
1).time_stamp
     if (updatedTime > lastUpdatedTime) {
-      LOGGER.error(s"Reverting changes for $dbName.$tableName")
+      LOGGER.info(s"Reverting changes for $dbName.$tableName")
       val addedSchemas = evolutionEntryList.get(evolutionEntryList.size() - 
1).added
       thriftTable.fact_table.table_columns.removeAll(addedSchemas)
       
CarbonEnv.get.carbonMetastore.revertTableSchema(carbonTable.getCarbonTableIdentifier,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/36112fa4/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
index 05b79a8..c9244bc 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.common.util.QueryTest
 import org.apache.spark.sql.test.TestQueryExecutor
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.metadata.CarbonMetadata
 
 class AlterTableRevertTestCase extends QueryTest with BeforeAndAfterAll {
 
@@ -41,7 +42,7 @@ class AlterTableRevertTestCase extends QueryTest with 
BeforeAndAfterAll {
       hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
       sql(
         "Alter table reverttest add columns(newField string) TBLPROPERTIES" +
-        "('DICTIONARY_EXCLUDE'='newField','DEFAULT.VALUE.charfield'='def')")
+        "('DEFAULT.VALUE.newField'='def')")
       hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
       intercept[AnalysisException] {
         sql("select newField from reverttest")
@@ -78,6 +79,22 @@ class AlterTableRevertTestCase extends QueryTest with 
BeforeAndAfterAll {
       sql("select intfield from 
reverttest").schema.fields.apply(0).dataType.simpleString == "int")
   }
 
+  test("test to check if dictionary files are deleted for new column if query 
fails") {
+    intercept[RuntimeException] {
+      hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
+      sql(
+        "Alter table reverttest add columns(newField string) TBLPROPERTIES" +
+        "('DEFAULT.VALUE.newField'='def')")
+      hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
+      intercept[AnalysisException] {
+        sql("select newField from reverttest")
+      }
+      val carbonTable = 
CarbonMetadata.getInstance.getCarbonTable("default_reverttest")
+
+      assert(new File(carbonTable.getMetaDataFilepath).listFiles().length < 6)
+    }
+  }
+
   override def afterAll() {
     hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
     sql("drop table if exists reverttest")

Reply via email to