Repository: carbondata Updated Branches: refs/heads/master 545e93db3 -> 414ea7730
[CARBONDATA-1229] restrict drop when loading is in progress This PR will restrict the table from getting dropped if a load is in progres This closes #1168 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/414ea773 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/414ea773 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/414ea773 Branch: refs/heads/master Commit: 414ea7730d418e6580bb3f8f52e19812e92e3620 Parents: 545e93d Author: kunal642 <[email protected]> Authored: Thu Jul 13 00:55:56 2017 +0530 Committer: Jacky Li <[email protected]> Committed: Thu Aug 3 00:09:28 2017 +0800 ---------------------------------------------------------------------- .../execution/command/carbonTableSchema.scala | 32 ++++++++++---------- .../execution/command/carbonTableSchema.scala | 23 ++++++++++++-- 2 files changed, 36 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/414ea773/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 9a02d13..00dfaec 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command import java.io.File import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer import scala.language.implicitConversions import org.apache.commons.lang3.StringUtils @@ -41,7 +42,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.dictionary.server.DictionaryServer -import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} +import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} @@ -767,27 +768,26 @@ private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: O val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext) val identifier = TableIdentifier(tableName, Option(dbName)) val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "") - val carbonLock = CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier.getDatabaseName, - carbonTableIdentifier.getTableName + CarbonCommonConstants.UNDERSCORE + - LockUsage.DROP_TABLE_LOCK) - val storePath = CarbonEnv.get.carbonMetastore.storePath - var isLocked = false + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK) + val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer() + val catalog = CarbonEnv.get.carbonMetastore + val storePath = catalog.storePath try { - isLocked = carbonLock.lockWithRetries() - if (isLocked) { - logInfo("Successfully able to get the lock for drop.") - } - else { - LOGGER.audit(s"Dropping table $dbName.$tableName failed as the Table is locked") - sys.error("Table is locked for deletion. Please try after some time") + locksToBeAcquired foreach { + lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock) } LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]") CarbonEnv.get.carbonMetastore.dropTable(storePath, identifier)(sqlContext) LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]") + } catch { + case ex: Exception => + LOGGER.error(ex, s"Dropping table $dbName.$tableName failed") + sys.error(s"Dropping table $dbName.$tableName failed: ${ex.getMessage}") } finally { - if (carbonLock != null && isLocked) { - if (carbonLock.unlock()) { - logInfo("Table MetaData Unlocked Successfully after dropping the table") + if (carbonLocks.nonEmpty) { + val unlocked = carbonLocks.forall(_.unlock()) + if (unlocked) { + logInfo("Table MetaData Unlocked Successfully") // deleting any remaining files. val metadataFilePath = CarbonStorePath .getCarbonTablePath(storePath, carbonTableIdentifier).getMetadataDirectoryPath http://git-wip-us.apache.org/repos/asf/carbondata/blob/414ea773/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index d34b91d..80b1436 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -36,6 +36,7 @@ import org.codehaus.jackson.map.ObjectMapper import org.apache.carbondata.api.CarbonStore import org.apache.carbondata.common.constants.LoggerAction import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.dictionary.server.DictionaryServer @@ -874,29 +875,45 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean, val identifier = TableIdentifier(tableName, Option(dbName)) val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "") val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK) - val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore + val carbonEnv = CarbonEnv.getInstance(sparkSession) + val catalog = carbonEnv.carbonMetastore + val storePath = carbonEnv.storePath val tableIdentifier = AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, dbName.toLowerCase, tableName.toLowerCase) catalog.checkSchemasModifiedTimeAndReloadTables(tableIdentifier.getStorePath) val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer() try { - locksToBeAcquired foreach { + locksToBeAcquired foreach { lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock) } LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]") - + val carbonTable = catalog + .lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation].metaData.carbonTable + if (null != carbonTable) { + // clear driver B-tree and dictionary cache + ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable) + } CarbonEnv.getInstance(sparkSession).carbonMetastore .dropTable(tableIdentifier.getTablePath, identifier)(sparkSession) LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]") } catch { case ex: Exception => LOGGER.error(ex, s"Dropping table $dbName.$tableName failed") + sys.error(s"Dropping table $dbName.$tableName failed: ${ex.getMessage}") } finally { if (carbonLocks.nonEmpty) { val unlocked = carbonLocks.forall(_.unlock()) if (unlocked) { logInfo("Table MetaData Unlocked Successfully") + // deleting any remaining files. + val metadataFilePath = CarbonStorePath + .getCarbonTablePath(storePath, carbonTableIdentifier).getMetadataDirectoryPath + val fileType = FileFactory.getFileType(metadataFilePath) + if (FileFactory.isFileExist(metadataFilePath, fileType)) { + val file = FileFactory.getCarbonFile(metadataFilePath, fileType) + CarbonUtil.deleteFoldersAndFiles(file.getParentFile) + } } } }
