added AlterTableAddColumnRDD to AlterTableCommands
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/36112fa4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/36112fa4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/36112fa4 Branch: refs/heads/branch-1.1 Commit: 36112fa46d2a15e6403af97a7d8677231bedc8d0 Parents: f27b491 Author: kunal642 <kunal.kap...@knoldus.in> Authored: Tue Apr 11 14:15:10 2017 +0530 Committer: Venkata Ramana G <ramana.gollam...@huawei.com> Committed: Fri Apr 14 16:16:52 2017 +0530 ---------------------------------------------------------------------- .../spark/rdd/AlterTableAddColumnRDD.scala | 6 +- .../execution/command/carbonTableSchema.scala | 8 +-- .../execution/command/AlterTableCommands.scala | 62 ++++++++++++-------- .../org/apache/spark/util/AlterTableUtil.scala | 16 +++-- .../restructure/AlterTableRevertTestCase.scala | 19 +++++- 5 files changed, 68 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/36112fa4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala index bb65b0b..ab1fd9c 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala @@ -19,7 +19,6 @@ package org.apache.carbondata.spark.rdd import org.apache.spark.{Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.execution.command.AlterTableAddColumnsModel import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -49,13 +48,12 @@ class AddColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Par */ class AlterTableAddColumnRDD[K, V](sc: SparkContext, @transient newColumns: Seq[ColumnSchema], - alterTableModel: AlterTableAddColumnsModel, carbonTableIdentifier: CarbonTableIdentifier, carbonStorePath: String) extends RDD[(Int, String)](sc, Nil) { override def getPartitions: Array[Partition] = { newColumns.zipWithIndex.map { column => - new DropColumnPartition(id, column._2, column._1) + new AddColumnPartition(id, column._2, column._1) }.toArray } @@ -65,7 +63,7 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext, val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS val iter = new Iterator[(Int, String)] { try { - val columnSchema = split.asInstanceOf[DropColumnPartition].columnSchema + val columnSchema = split.asInstanceOf[AddColumnPartition].columnSchema // create dictionary file if it is a dictionary column if (columnSchema.hasEncoding(Encoding.DICTIONARY) && !columnSchema.hasEncoding(Encoding.DIRECT_DICTIONARY)) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/36112fa4/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 117b365..5108df8 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -152,7 +152,7 @@ case class AlterTableDropColumnModel(databaseName: Option[String], tableName: String, columns: List[String]) -class AlterTableProcessor( +class AlterTableColumnSchemaGenerator( alterTableModel: AlterTableAddColumnsModel, dbName: String, tableInfo: TableInfo, @@ -253,12 +253,6 @@ class AlterTableProcessor( } } } - // generate dictionary files for the newly added columns - new AlterTableAddColumnRDD(sc, - newCols, - alterTableModel, - tableIdentifier, - storePath).collect() tableSchema.setListOfColumns(allColumns.asJava) tableInfo.setLastUpdatedTime(System.currentTimeMillis()) tableInfo.setFactTable(tableSchema) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/36112fa4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala index e380217..8b194da 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala @@ -28,17 +28,15 @@ import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} +import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.encoder.Encoding -import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo} import org.apache.carbondata.spark.exception.MalformedCarbonCommandException -import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD +import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD} import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil} private[sql] case class AlterTableAddColumns( @@ -52,13 +50,15 @@ private[sql] case class AlterTableAddColumns( .getOrElse(sparkSession.catalog.currentDatabase) LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName") val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) - val locks = AlterTableUtil - .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession) - // get the latest carbon table and check for column existence - val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) + var locks = List.empty[ICarbonLock] + var lastUpdatedTime = 0L var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]() - val lastUpdatedTime = carbonTable.getTableLastUpdatedTime + val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) try { + locks = AlterTableUtil + .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession) + // get the latest carbon table and check for column existence + lastUpdatedTime = carbonTable.getTableLastUpdatedTime // read the latest schema file val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, carbonTable.getCarbonTableIdentifier) @@ -71,12 +71,17 @@ private[sql] case class AlterTableAddColumns( dbName, tableName, carbonTable.getStorePath) - newCols = new AlterTableProcessor(alterTableAddColumnsModel, + newCols = new AlterTableColumnSchemaGenerator(alterTableAddColumnsModel, dbName, wrapperTableInfo, carbonTablePath, carbonTable.getCarbonTableIdentifier, carbonTable.getStorePath, sparkSession.sparkContext).process + // generate dictionary files for the newly added columns + new AlterTableAddColumnRDD(sparkSession.sparkContext, + newCols, + carbonTable.getCarbonTableIdentifier, + carbonTable.getStorePath).collect() val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis) schemaEvolutionEntry.setAdded(newCols.toList.asJava) @@ -92,7 +97,7 @@ private[sql] case class AlterTableAddColumns( } catch { case e: Exception => LOGGER .error("Alter table add columns failed :" + e.getMessage) - if (!newCols.isEmpty) { + if (newCols.nonEmpty) { LOGGER.info("Cleaning up the dictionary files as alter table add operation failed") new AlterTableDropColumnRDD(sparkSession.sparkContext, newCols, @@ -100,7 +105,7 @@ private[sql] case class AlterTableAddColumns( carbonTable.getStorePath).collect() } AlterTableUtil.revertAddColumnChanges(dbName, tableName, lastUpdatedTime)(sparkSession) - sys.error("Alter table add column operation failed. Please check the logs") + sys.error(s"Alter table add operation failed: ${e.getMessage}") } finally { // release lock after command execution completion AlterTableUtil.releaseLocks(locks, LOGGER) @@ -147,12 +152,14 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR LockUsage.DELETE_SEGMENT_LOCK, LockUsage.CLEAN_FILES_LOCK, LockUsage.DROP_TABLE_LOCK) - val locks = AlterTableUtil - .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired, LOGGER)( - sparkSession) + var locks = List.empty[ICarbonLock] + var lastUpdatedTime = 0L val carbonTable = relation.tableMeta.carbonTable - val lastUpdatedTime = carbonTable.getTableLastUpdatedTime try { + locks = AlterTableUtil + .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired, LOGGER)( + sparkSession) + lastUpdatedTime = carbonTable.getTableLastUpdatedTime // get the latest carbon table and check for column existence val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, carbonTable.getCarbonTableIdentifier) @@ -197,7 +204,7 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR AlterTableUtil.revertRenameTableChanges(oldTableIdentifier, newTableName, lastUpdatedTime)( sparkSession) renameBadRecords(newTableName, oldTableName, oldDatabaseName) - sys.error("Alter table rename table operation failed. Please check the logs") + sys.error(s"Alter table rename table operation failed: ${e.getMessage}") } finally { // release lock after command execution completion AlterTableUtil.releaseLocks(locks, LOGGER) @@ -242,13 +249,15 @@ private[sql] case class AlterTableDropColumns( val dbName = alterTableDropColumnModel.databaseName .getOrElse(sparkSession.catalog.currentDatabase) LOGGER.audit(s"Alter table drop columns request has been received for $dbName.$tableName") + var locks = List.empty[ICarbonLock] + var lastUpdatedTime = 0L val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) - val locks = AlterTableUtil - .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession) // get the latest carbon table and check for column existence val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) - val lastUpdatedTime = carbonTable.getTableLastUpdatedTime try { + locks = AlterTableUtil + .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession) + lastUpdatedTime = carbonTable.getTableLastUpdatedTime // check each column existence in the table val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column @@ -318,7 +327,7 @@ private[sql] case class AlterTableDropColumns( case e: Exception => LOGGER .error("Alter table drop columns failed : " + e.getMessage) AlterTableUtil.revertDropColumnChanges(dbName, tableName, lastUpdatedTime)(sparkSession) - sys.error("Alter table drop column operation failed. Please check the logs") + sys.error(s"Alter table drop column operation failed: ${e.getMessage}") } finally { // release lock after command execution completion AlterTableUtil.releaseLocks(locks, LOGGER) @@ -338,15 +347,16 @@ private[sql] case class AlterTableDataTypeChange( .getOrElse(sparkSession.catalog.currentDatabase) LOGGER.audit(s"Alter table change data type request has been received for $dbName.$tableName") val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) - val locks = AlterTableUtil - .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession) + var locks = List.empty[ICarbonLock] // get the latest carbon table and check for column existence val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) - val lastUpdatedTime = carbonTable.getTableLastUpdatedTime + var lastUpdatedTime = 0L try { + locks = AlterTableUtil + .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession) + lastUpdatedTime = carbonTable.getTableLastUpdatedTime val columnName = alterTableDataTypeChangeModel.columnName val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible) - if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) { LOGGER.audit(s"Alter table change data type request has failed. " + s"Column $columnName does not exist") @@ -397,7 +407,7 @@ private[sql] case class AlterTableDataTypeChange( case e: Exception => LOGGER .error("Alter table change datatype failed : " + e.getMessage) AlterTableUtil.revertDataTypeChanges(dbName, tableName, lastUpdatedTime)(sparkSession) - sys.error("Alter table data type change operation failed. Please check the logs") + sys.error(s"Alter table data type change operation failed: ${e.getMessage}") } finally { // release lock after command execution completion AlterTableUtil.releaseLocks(locks, LOGGER) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/36112fa4/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index 5057d75..f5248f5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -64,11 +64,17 @@ object AlterTableUtil { } // acquire the lock first val table = relation.tableMeta.carbonTable - var acquiredLocks = ListBuffer[ICarbonLock]() - locksToBeAcquired.foreach { lock => - acquiredLocks += getLockObject(table, lock, LOGGER) + val acquiredLocks = ListBuffer[ICarbonLock]() + try { + locksToBeAcquired.foreach { lock => + acquiredLocks += getLockObject(table, lock, LOGGER) + } + acquiredLocks.toList + } catch { + case e: Exception => + releaseLocks(acquiredLocks.toList, LOGGER) + throw e } - acquiredLocks.toList } /** @@ -249,7 +255,7 @@ object AlterTableUtil { val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp if (updatedTime > lastUpdatedTime) { - LOGGER.error(s"Reverting changes for $dbName.$tableName") + LOGGER.info(s"Reverting changes for $dbName.$tableName") val addedSchemas = evolutionEntryList.get(evolutionEntryList.size() - 1).added thriftTable.fact_table.table_columns.removeAll(addedSchemas) CarbonEnv.get.carbonMetastore.revertTableSchema(carbonTable.getCarbonTableIdentifier, http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/36112fa4/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala index 05b79a8..c9244bc 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.common.util.QueryTest import org.apache.spark.sql.test.TestQueryExecutor import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.core.metadata.CarbonMetadata class AlterTableRevertTestCase extends QueryTest with BeforeAndAfterAll { @@ -41,7 +42,7 @@ class AlterTableRevertTestCase extends QueryTest with BeforeAndAfterAll { hiveClient.runSqlHive("set hive.security.authorization.enabled=true") sql( "Alter table reverttest add columns(newField string) TBLPROPERTIES" + - "('DICTIONARY_EXCLUDE'='newField','DEFAULT.VALUE.charfield'='def')") + "('DEFAULT.VALUE.newField'='def')") hiveClient.runSqlHive("set hive.security.authorization.enabled=false") intercept[AnalysisException] { sql("select newField from reverttest") @@ -78,6 +79,22 @@ class AlterTableRevertTestCase extends QueryTest with BeforeAndAfterAll { sql("select intfield from reverttest").schema.fields.apply(0).dataType.simpleString == "int") } + test("test to check if dictionary files are deleted for new column if query fails") { + intercept[RuntimeException] { + hiveClient.runSqlHive("set hive.security.authorization.enabled=true") + sql( + "Alter table reverttest add columns(newField string) TBLPROPERTIES" + + "('DEFAULT.VALUE.newField'='def')") + hiveClient.runSqlHive("set hive.security.authorization.enabled=false") + intercept[AnalysisException] { + sql("select newField from reverttest") + } + val carbonTable = CarbonMetadata.getInstance.getCarbonTable("default_reverttest") + + assert(new File(carbonTable.getMetaDataFilepath).listFiles().length < 6) + } + } + override def afterAll() { hiveClient.runSqlHive("set hive.security.authorization.enabled=false") sql("drop table if exists reverttest")