[CARBONDATA-1229] acquired meta.lock during table drop This closes #1153
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/403c3d9b Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/403c3d9b Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/403c3d9b Branch: refs/heads/metadata Commit: 403c3d9b41e166311ac45ec33b375cbecc8c4741 Parents: 619f1f9 Author: kunalkapoor <kunalkapoor...@gmail.com> Authored: Mon Jul 10 12:12:10 2017 +0530 Committer: Venkata Ramana G <ramana.gollam...@huawei.com> Committed: Mon Jul 10 19:32:43 2017 +0530 ---------------------------------------------------------------------- .../carbondata/core/locks/CarbonLockUtil.java | 24 +++++++++ .../execution/command/carbonTableSchema.scala | 52 +++++++++----------- .../org/apache/spark/util/AlterTableUtil.scala | 25 +--------- 3 files changed, 50 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/403c3d9b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java index fba03a1..eaaaf94 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java @@ -19,6 +19,7 @@ package org.apache.carbondata.core.locks; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; /** * This class contains all carbon lock utilities @@ -60,4 +61,27 @@ public class CarbonLockUtil { } } } + + /** + * Given a lock type this method will return a new lock object if not acquired by any other + * operation + * + * @param carbonTable + * @param lockType + * @return + */ + public static ICarbonLock getLockObject(CarbonTable carbonTable, + String lockType) { + ICarbonLock carbonLock = CarbonLockFactory + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier().getCarbonTableIdentifier(), + lockType); + LOGGER.info("Trying to acquire lock: " + carbonLock); + if (carbonLock.lockWithRetries()) { + LOGGER.info("Successfully acquired the lock " + carbonLock); + } else { + throw new RuntimeException("Table is locked for updation. Please try after some time"); + } + return carbonLock; + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/403c3d9b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 8e7db45..2e5812c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -17,9 +17,8 @@ package org.apache.spark.sql.execution.command -import java.io.File - import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer import scala.language.implicitConversions import org.apache.commons.lang3.StringUtils @@ -30,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation, HiveExternalCatalog} +import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation} import org.apache.spark.util.FileUtils import org.codehaus.jackson.map.ObjectMapper @@ -41,10 +40,10 @@ import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.dictionary.server.DictionaryServer -import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} +import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.CarbonTableIdentifier import org.apache.carbondata.core.metadata.encoder.Encoding -import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} +import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} @@ -834,24 +833,17 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean, val dbName = getDB.getDatabaseName(databaseNameOp, sparkSession) val identifier = TableIdentifier(tableName, Option(dbName)) val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "") - val carbonLock = CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier.getDatabaseName, - carbonTableIdentifier.getTableName + CarbonCommonConstants.UNDERSCORE + - LockUsage.DROP_TABLE_LOCK) + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK) val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore val storePath = catalog.storePath - var isLocked = false catalog.checkSchemasModifiedTimeAndReloadTables() + val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer() try { - isLocked = carbonLock.lockWithRetries() - if (isLocked) { - logInfo("Successfully able to get the lock for drop.") - } - else { - LOGGER.audit(s"Dropping table $dbName.$tableName failed as the Table is locked") - sys.error("Table is locked for deletion. Please try after some time") + val carbonTable = catalog.getTableFromMetadata(dbName, tableName).map(_.carbonTable).orNull + locksToBeAcquired foreach { + lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTable, lock) } LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]") - val carbonTable = catalog.getTableFromMetadata(dbName, tableName).map(_.carbonTable).orNull if (null != carbonTable) { // clear driver B-tree and dictionary cache ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable) @@ -859,18 +851,22 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean, CarbonEnv.getInstance(sparkSession).carbonMetastore .dropTable(storePath, identifier)(sparkSession) LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]") + } catch { + case ex: Exception => + LOGGER.error(ex, s"Dropping table $dbName.$tableName failed") } finally { - if (carbonLock != null && isLocked) { - if (carbonLock.unlock()) { - logInfo("Table MetaData Unlocked Successfully after dropping the table") - // deleting any remaining files. - val metadataFilePath = CarbonStorePath - .getCarbonTablePath(storePath, carbonTableIdentifier).getMetadataDirectoryPath - val fileType = FileFactory.getFileType(metadataFilePath) - if (FileFactory.isFileExist(metadataFilePath, fileType)) { - val file = FileFactory.getCarbonFile(metadataFilePath, fileType) - CarbonUtil.deleteFoldersAndFiles(file.getParentFile) - } + if (carbonLocks.nonEmpty) { + val unlocked = carbonLocks.forall(_.unlock()) + if (unlocked) { + logInfo("Table MetaData Unlocked Successfully") + } + // deleting any remaining files. + val metadataFilePath = CarbonStorePath + .getCarbonTablePath(storePath, carbonTableIdentifier).getMetadataDirectoryPath + val fileType = FileFactory.getFileType(metadataFilePath) + if (FileFactory.isFileExist(metadataFilePath, fileType)) { + val file = FileFactory.getCarbonFile(metadataFilePath, fileType) + CarbonUtil.deleteFoldersAndFiles(file.getParentFile) } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/403c3d9b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index 9e402cd..87717fb 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.hive.HiveExternalCatalog._ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock} +import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock} import org.apache.carbondata.core.metadata.CarbonTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.path.CarbonStorePath @@ -65,7 +65,7 @@ object AlterTableUtil { val acquiredLocks = ListBuffer[ICarbonLock]() try { locksToBeAcquired.foreach { lock => - acquiredLocks += getLockObject(table, lock) + acquiredLocks += CarbonLockUtil.getLockObject(table, lock) } acquiredLocks.toList } catch { @@ -76,27 +76,6 @@ object AlterTableUtil { } /** - * Given a lock type this method will return a new lock object if not acquired by any other - * operation - * - * @param carbonTable - * @param lockType - * @return - */ - private def getLockObject(carbonTable: CarbonTable, - lockType: String): ICarbonLock = { - val carbonLock = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, - lockType) - if (carbonLock.lockWithRetries()) { - LOGGER.info(s"Successfully acquired the lock $lockType") - } else { - sys.error("Table is locked for updation. Please try after some time") - } - carbonLock - } - - /** * This method will release the locks acquired for an operation * * @param locks