Repository: incubator-carbondata
Updated Branches:
  refs/heads/master c693001ef -> 617b69aac


[CARBONDATA-320] if drop table when the datanodes are down, hive metastore 
entry is removed while files not dropped.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/8389f0eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/8389f0eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/8389f0eb

Branch: refs/heads/master
Commit: 8389f0ebcf9d1389a444346962ec9b0bc391f70a
Parents: c693001
Author: ravikiran <ravikiran.sn...@gmail.com>
Authored: Mon Oct 17 15:19:19 2016 +0530
Committer: Venkata Ramana G <ramana.gollam...@huawei.com>
Committed: Tue Oct 18 00:05:58 2016 +0530

----------------------------------------------------------------------
 .../execution/command/carbonTableSchema.scala   | 120 ++++++-------------
 .../spark/sql/hive/CarbonMetastoreCatalog.scala |  60 +++++-----
 .../spark/sql/hive/CarbonStrategies.scala       |   4 +-
 .../apache/carbondata/lcm/locks/LockUsage.java  |   1 +
 4 files changed, 71 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8389f0eb/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 7b6a213..c265f7f 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -40,13 +40,14 @@ import org.codehaus.jackson.map.ObjectMapper
 
 import org.apache.carbondata.common.factory.CarbonCommonFactory
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
+import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, 
CarbonTableIdentifier}
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
 import org.apache.carbondata.core.carbon.metadata.schema.{SchemaEvolution, 
SchemaEvolutionEntry}
 import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, 
TableInfo, TableSchema}
 import 
org.apache.carbondata.core.carbon.metadata.schema.table.column.{CarbonDimension,
 ColumnSchema}
+import org.apache.carbondata.core.carbon.path.CarbonStorePath
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
 import org.apache.carbondata.core.load.LoadMetadataDetails
@@ -59,7 +60,6 @@ import 
org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.spark.CarbonSparkFactory
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.partition.api.impl.QueryPartitionHelper
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, 
DataTypeConverterUtil, GlobalDictionaryUtil}
 
@@ -875,18 +875,11 @@ case class CreateTable(cm: tableModel) extends 
RunnableCommand {
               .collect
       } catch {
         case e: Exception =>
-
           val identifier: TableIdentifier = TableIdentifier(tbName, 
Some(dbName))
-          val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
-            
.lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
-          if (relation != null) {
-            LOGGER.audit(s"Deleting Table [$tbName] under Database [$dbName]" +
-                         "as create TABLE failed")
-            CarbonEnv.getInstance(sqlContext).carbonCatalog
-              .dropTable(relation.tableMeta.partitioner.partitionCount,
-                relation.tableMeta.storePath,
-                identifier)(sqlContext)
-          }
+          // call the drop table to delete the created table.
+
+          CarbonEnv.getInstance(sqlContext).carbonCatalog
+            .dropTable(catalog.storePath, identifier)(sqlContext)
 
           LOGGER.audit(s"Table creation with Database name [$dbName] " +
             s"and Table name [$tbName] failed")
@@ -1234,81 +1227,46 @@ private[sql] case class DropTableCommand(ifExistsSet: 
Boolean, databaseNameOp: O
     val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
     val identifier = TableIdentifier(tableName, Option(dbName))
-    val tmpTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + 
tableName)
-    if (null == tmpTable) {
-      if (!ifExistsSet) {
-        LOGGER
-          .audit(s"Dropping carbon table with Database name [$dbName] and 
Table name" +
-                 "[$tableName] failed")
-        LOGGER.error(s"Carbon Table $dbName.$tableName metadata does not 
exist")
+    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, 
"")
+    val carbonLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTableIdentifier, LockUsage.DROP_TABLE_LOCK)
+    val storePath = CarbonEnv.getInstance(sqlContext).carbonCatalog.storePath
+    try {
+      if (carbonLock.lockWithRetries()) {
+        logInfo("Successfully able to get the lock for drop.")
       }
-      if (sqlContext.tableNames(dbName).map(x => x.toLowerCase())
-        .contains(tableName.toLowerCase())) {
-          CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, 
sqlContext)
-      } else if (!ifExistsSet) {
-        sys.error(s"Carbon Table $dbName.$tableName does not exist")
+      else {
+        LOGGER.audit(s"Dropping table $dbName.$tableName failed as the Table 
is locked")
+        sys.error("Table is locked for deletion. Please try after some time")
       }
-    } else {
-      CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", 
"false")
-      val carbonLock = CarbonLockFactory
-        
.getCarbonLockObj(tmpTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-          LockUsage.METADATA_LOCK
-        )
-      try {
-        if (carbonLock.lockWithRetries()) {
-          logInfo("Successfully able to get the table metadata file lock")
-        } else {
-          LOGGER.audit(s"Dropping table $dbName.$tableName failed as the Table 
is locked")
-          sys.error("Table is locked for updation. Please try after some time")
-        }
-
-        val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
-          .lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
-
-        if (relation == null) {
-          if (!ifExistsSet) {
-            sys.error(s"Table $dbName.$tableName does not exist")
+      LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
+      CarbonEnv.getInstance(sqlContext).carbonCatalog.dropTable(storePath, 
identifier)(sqlContext)
+      LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
+    }
+    finally {
+      if (carbonLock != null) {
+        if (carbonLock.unlock()) {
+          logInfo("Table MetaData Unlocked Successfully after dropping the 
table")
+          // deleting any remaining files.
+          val metadataFilePath = CarbonStorePath
+            .getCarbonTablePath(storePath, 
carbonTableIdentifier).getMetadataDirectoryPath
+          val fileType = FileFactory.getFileType(metadataFilePath)
+          if (FileFactory.isFileExist(metadataFilePath, fileType)) {
+            val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
+            CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
           }
-        } else {
-          LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
-
-          CarbonEnv.getInstance(sqlContext).carbonCatalog
-            .dropTable(relation.tableMeta.partitioner.partitionCount,
-              relation.tableMeta.storePath,
-              
TableIdentifier(relation.tableMeta.carbonTableIdentifier.getTableName,
-                Some(relation.tableMeta.carbonTableIdentifier.getDatabaseName))
-              )(sqlContext)
-          CarbonDataRDDFactory
-            .dropTable(sqlContext.sparkContext, dbName, tableName,
-              relation.tableMeta.partitioner)
-          QueryPartitionHelper.getInstance().removePartition(dbName, tableName)
-
-          LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
-        }
-      }
-      finally {
-        if (carbonLock != null) {
-          if (carbonLock.unlock()) {
-            logInfo("Table MetaData Unlocked Successfully after dropping the 
table")
-            val fileType = FileFactory.getFileType(tmpTable 
.getMetaDataFilepath)
-            if (FileFactory.isFileExist(tmpTable .getMetaDataFilepath, 
fileType)) {
-              val file = FileFactory.getCarbonFile(tmpTable 
.getMetaDataFilepath, fileType)
-              CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
-            }
-            // delete bad record log after drop table
-            val badLogPath = CarbonUtil.getBadLogPath(dbName +  File.separator 
+ tableName)
-            val badLogFileType = FileFactory.getFileType(badLogPath)
-            if (FileFactory.isFileExist(badLogPath, badLogFileType)) {
-              val file = FileFactory.getCarbonFile(badLogPath, badLogFileType)
-              CarbonUtil.deleteFoldersAndFiles(file)
-            }
-          } else {
-            logError("Unable to unlock Table MetaData")
+          // delete bad record log after drop table
+          val badLogPath = CarbonUtil.getBadLogPath(dbName +  File.separator + 
tableName)
+          val badLogFileType = FileFactory.getFileType(badLogPath)
+          if (FileFactory.isFileExist(badLogPath, badLogFileType)) {
+            val file = FileFactory.getCarbonFile(badLogPath, badLogFileType)
+            CarbonUtil.deleteFoldersAndFiles(file)
           }
+        } else {
+          logError("Unable to unlock Table MetaData")
         }
       }
     }
-
     Seq.empty
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8389f0eb/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
index 8b05629..7b3fea9 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
@@ -412,46 +412,44 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, 
val storePath: String,
     }
   }
 
-  def dropTable(partitionCount: Int, tableStorePath: String, tableIdentifier: 
TableIdentifier)
+  def isTablePathExists(tableIdentifier: TableIdentifier)(sqlContext: 
SQLContext): Boolean = {
+    val dbName = 
tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
+    val tableName = tableIdentifier.table
+
+    val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath,
+      new CarbonTableIdentifier(dbName, tableName, "")).getPath
+
+    val fileType = FileFactory.getFileType(tablePath)
+    FileFactory.isFileExist(tablePath, fileType)
+  }
+
+  def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
     (sqlContext: SQLContext) {
     val dbName = tableIdentifier.database.get
     val tableName = tableIdentifier.table
-    if (!tableExists(tableIdentifier)(sqlContext)) {
-      LOGGER.audit(s"Drop Table failed. Table with $dbName.$tableName does not 
exist")
-      sys.error(s"Table with $dbName.$tableName does not exist")
-    }
 
-    val carbonTable = 
org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
-      .getCarbonTable(dbName + "_" + tableName)
+    val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
+      new CarbonTableIdentifier(dbName, tableName, 
"")).getMetadataDirectoryPath
 
-    if (null != carbonTable) {
-      val metadatFilePath = carbonTable.getMetaDataFilepath
-      val fileType = FileFactory.getFileType(metadatFilePath)
+    val fileType = FileFactory.getFileType(metadataFilePath)
 
-      if (FileFactory.isFileExist(metadatFilePath, fileType)) {
-        val file = FileFactory.getCarbonFile(metadatFilePath, fileType)
-        CarbonUtil.renameTableForDeletion(partitionCount, tableStorePath, 
dbName, tableName)
-        CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
-      }
+    if (FileFactory.isFileExist(metadataFilePath, fileType)) {
 
-      val partitionLocation = tableStorePath + File.separator + "partition" + 
File.separator +
-                              dbName + File.separator + tableName
-      val partitionFileType = FileFactory.getFileType(partitionLocation)
-      if (FileFactory.isFileExist(partitionLocation, partitionFileType)) {
-        CarbonUtil
-          .deleteFoldersAndFiles(FileFactory.getCarbonFile(partitionLocation, 
partitionFileType))
-      }
-    }
+      val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
+      CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
 
-    metadata.tablesMeta -= metadata.tablesMeta.filter(
-      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(dbName) &&
-           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))(0)
-    org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
-      .removeTable(dbName + "_" + tableName)
-    CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, 
sqlContext)
+      metadata.tablesMeta -= metadata.tablesMeta.filter(
+        c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(dbName) 
&&
+             
c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))(0)
+      org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
+        .removeTable(dbName + "_" + tableName)
 
-    // discard cached table info in cachedDataSourceTables
-    sqlContext.catalog.refreshTable(tableIdentifier)
+      CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, 
sqlContext)
+      updateSchemasUpdatedTime(dbName, tableName)
+
+      // discard cached table info in cachedDataSourceTables
+      sqlContext.catalog.refreshTable(tableIdentifier)
+    }
   }
 
   private def getTimestampFileAndType(databaseName: String, tableName: String) 
= {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8389f0eb/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index 4817612..1a31229 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -220,8 +220,8 @@ class CarbonStrategies(sqlContext: SQLContext) extends 
QueryPlanner[SparkPlan] {
   object DDLStrategies extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case DropTable(tableName, ifNotExists)
-        if CarbonEnv.getInstance(sqlContext).carbonCatalog
-            .tableExists(toTableIdentifier(tableName.toLowerCase))(sqlContext) 
=>
+        if (CarbonEnv.getInstance(sqlContext).carbonCatalog
+            
.isTablePathExists(toTableIdentifier(tableName.toLowerCase))(sqlContext)) =>
         val identifier = toTableIdentifier(tableName.toLowerCase)
         ExecutedCommand(DropTableCommand(ifNotExists, identifier.database, 
identifier.table)) :: Nil
       case ShowLoadsCommand(databaseName, table, limit) =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8389f0eb/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java 
b/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java
index b11951b..9b84042 100644
--- a/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java
+++ b/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java
@@ -30,5 +30,6 @@ public class LockUsage {
   public static final String TABLE_STATUS_LOCK = "tablestatus.lock";
   public static final String DELETE_SEGMENT_LOCK = "delete_segment.lock";
   public static final String CLEAN_FILES_LOCK = "clean_files.lock";
+  public static final String DROP_TABLE_LOCK = "droptable.lock";
 
 }

Reply via email to