[CARBONDATA-1284]Implement hive based schema storage in carbon This closes #1149
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/042a05a5 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/042a05a5 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/042a05a5 Branch: refs/heads/master Commit: 042a05a58cec223086ad55ad48ca27e34c40d135 Parents: 92fe63c Author: ravipesala <[email protected]> Authored: Sat Jul 8 17:14:04 2017 +0530 Committer: Raghunandan S <[email protected]> Committed: Thu Jul 27 18:47:52 2017 +0800 ---------------------------------------------------------------------- .../execution/command/carbonTableSchema.scala | 30 +- .../spark/sql/hive/CarbonFileMetastore.scala | 384 ++++++++++--------- 2 files changed, 225 insertions(+), 189 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/042a05a5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 46b58c5..1781477 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -233,10 +233,10 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru sparkSession.sql( s"""CREATE TABLE $dbName.$tbName - |(${ fields.map(f => f.rawSchema).mkString(",") }) - |USING org.apache.spark.sql.CarbonSource""".stripMargin + - s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin + - s""""$tablePath" $carbonSchemaString) """) + |(${ fields.map(f => f.rawSchema).mkString(",") }) + |USING org.apache.spark.sql.CarbonSource""".stripMargin + + s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin + + s""""$tablePath"$carbonSchemaString) """) } catch { case e: Exception => val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName)) @@ -268,8 +268,8 @@ case class DeleteLoadsById( override def processData(sparkSession: SparkSession): Seq[Row] = { Checker.validateTableExists(databaseNameOp, tableName, sparkSession) val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. - lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. - tableMeta.carbonTable + lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. + tableMeta.carbonTable CarbonStore.deleteLoadById( loadids, getDB.getDatabaseName(databaseNameOp, sparkSession), @@ -293,8 +293,8 @@ case class DeleteLoadsByLoadDate( override def processData(sparkSession: SparkSession): Seq[Row] = { Checker.validateTableExists(databaseNameOp, tableName, sparkSession) val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. - lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. - tableMeta.carbonTable + lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. + tableMeta.carbonTable CarbonStore.deleteLoadByDate( loadDate, getDB.getDatabaseName(databaseNameOp, sparkSession), @@ -750,11 +750,11 @@ case class LoadTable( (dataFrame, dataFrame) } - GlobalDictionaryUtil.generateGlobalDictionary( - sparkSession.sqlContext, - carbonLoadModel, - relation.tableMeta.storePath, - dictionaryDataFrame) + GlobalDictionaryUtil.generateGlobalDictionary( + sparkSession.sqlContext, + carbonLoadModel, + relation.tableMeta.storePath, + dictionaryDataFrame) CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext, carbonLoadModel, relation.tableMeta.storePath, @@ -847,8 +847,8 @@ case class ShowLoads( override def processData(sparkSession: SparkSession): Seq[Row] = { Checker.validateTableExists(databaseNameOp, tableName, sparkSession) val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. - lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. - tableMeta.carbonTable + lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. + tableMeta.carbonTable CarbonStore.showSegments( getDB.getDatabaseName(databaseNameOp, sparkSession), tableName, http://git-wip-us.apache.org/repos/asf/carbondata/blob/042a05a5/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 2407054..048681c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -22,23 +22,22 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.ArrayBuffer -import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession} +import org.apache.spark.sql.{RuntimeConfig, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} -import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.datastore.impl.FileFactory.FileType import org.apache.carbondata.core.fileoperations.FileWriteOperation import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl -import org.apache.carbondata.core.metadata.schema -import org.apache.carbondata.core.metadata.schema.table import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil} import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.core.writer.ThriftWriter import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} @@ -63,7 +62,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) { } } -class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore { +class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends CarbonMetaStore { @transient val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog") @@ -78,7 +77,7 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore { System.nanoTime() + "" } - val metadata = MetaData(new ArrayBuffer[TableMeta]()) + lazy val metadata = loadMetadata(storePath, nextQueryId) /** @@ -91,22 +90,9 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore { override def createCarbonRelation(parameters: Map[String, String], absIdentifier: AbsoluteTableIdentifier, sparkSession: SparkSession): CarbonRelation = { - val database = absIdentifier.getCarbonTableIdentifier.getDatabaseName - val tableName = absIdentifier.getCarbonTableIdentifier.getTableName - val tables = getTableFromMetadataCache(database, tableName) - tables match { - case Some(t) => - CarbonRelation(database, tableName, - CarbonSparkUtil.createSparkMeta(t.carbonTable), t) - case None => - readCarbonSchema(absIdentifier) match { - case Some(meta) => - CarbonRelation(database, tableName, - CarbonSparkUtil.createSparkMeta(meta.carbonTable), meta) - case None => - throw new NoSuchTableException(database, tableName) - } - } + lookupRelation(TableIdentifier(absIdentifier.getCarbonTableIdentifier.getTableName, + Some(absIdentifier.getCarbonTableIdentifier.getDatabaseName)))(sparkSession) + .asInstanceOf[CarbonRelation] } def lookupRelation(dbName: Option[String], tableName: String) @@ -114,21 +100,20 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore { lookupRelation(TableIdentifier(tableName, dbName))(sparkSession) } - override def lookupRelation(tableIdentifier: TableIdentifier) + def lookupRelation(tableIdentifier: TableIdentifier) (sparkSession: SparkSession): LogicalPlan = { + checkSchemasModifiedTimeAndReloadTables() val database = tableIdentifier.database.getOrElse( - sparkSession.catalog.currentDatabase) - val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match { - case SubqueryAlias(_, - LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), - _) => - carbonDatasourceHadoopRelation.carbonRelation - case LogicalRelation( - carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => - carbonDatasourceHadoopRelation.carbonRelation - case _ => throw new NoSuchTableException(database, tableIdentifier.table) + sparkSession.catalog.currentDatabase + ) + val tables = getTableFromMetadata(database, tableIdentifier.table, true) + tables match { + case Some(t) => + CarbonRelation(database, tableIdentifier.table, + CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head) + case None => + throw new NoSuchTableException(database, tableIdentifier.table) } - relation } /** @@ -138,7 +123,8 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore { * @param tableName * @return */ - def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta] = { + def getTableFromMetadata(database: String, + tableName: String, readStore: Boolean = false): Option[TableMeta] = { metadata.tablesMeta .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) && c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName)) @@ -150,48 +136,99 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore { tableExists(TableIdentifier(table, databaseOp))(sparkSession) } - override def tableExists(tableIdentifier: TableIdentifier) - (sparkSession: SparkSession): Boolean = { - try { - lookupRelation(tableIdentifier)(sparkSession) - } catch { - case e: Exception => - return false + def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = { + checkSchemasModifiedTimeAndReloadTables() + val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + val tables = metadata.tablesMeta.filter( + c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) && + c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table)) + tables.nonEmpty + } + + def loadMetadata(metadataPath: String, queryId: String): MetaData = { + val recorder = CarbonTimeStatisticsFactory.createDriverRecorder() + val statistic = new QueryStatistic() + // creating zookeeper instance once. + // if zookeeper is configured as carbon lock type. + val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null) + if (null != zookeeperurl) { + CarbonProperties.getInstance + .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl) + } + if (metadataPath == null) { + return null + } + // if no locktype is configured and store type is HDFS set HDFS lock as default + if (null == CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.LOCK_TYPE) && + FileType.HDFS == FileFactory.getFileType(metadataPath)) { + CarbonProperties.getInstance + .addProperty(CarbonCommonConstants.LOCK_TYPE, + CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS + ) + LOGGER.info("Default lock type HDFSLOCK is configured") } - true + val fileType = FileFactory.getFileType(metadataPath) + val metaDataBuffer = new ArrayBuffer[TableMeta] + fillMetaData(metadataPath, fileType, metaDataBuffer) + updateSchemasUpdatedTime(readSchemaFileSystemTime("", "")) + statistic.addStatistics(QueryStatisticsConstants.LOAD_META, + System.currentTimeMillis()) + recorder.recordStatisticsForDriver(statistic, queryId) + MetaData(metaDataBuffer) } - private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[TableMeta] = { - val dbName = identifier.getCarbonTableIdentifier.getDatabaseName - val tableName = identifier.getCarbonTableIdentifier.getTableName - val storePath = identifier.getStorePath - val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(), - tableName.toLowerCase(), UUID.randomUUID().toString) - val carbonTablePath = - CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) - val tableMetadataFile = carbonTablePath.getSchemaFilePath - val fileType = FileFactory.getFileType(tableMetadataFile) - if (FileFactory.isFileExist(tableMetadataFile, fileType)) { - val tableUniqueName = dbName + "_" + tableName - val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile) - val schemaConverter = new ThriftWrapperSchemaConverterImpl - val wrapperTableInfo = schemaConverter - .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath) - val schemaFilePath = CarbonStorePath - .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath - wrapperTableInfo.setStorePath(storePath) - wrapperTableInfo - .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath)) - CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) - val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) - val tableMeta = new TableMeta(carbonTable.getCarbonTableIdentifier, - identifier.getStorePath, - identifier.getTablePath, - carbonTable) - metadata.tablesMeta += tableMeta - Some(tableMeta) - } else { - None + private def fillMetaData(basePath: String, fileType: FileType, + metaDataBuffer: ArrayBuffer[TableMeta]): Unit = { + val databasePath = basePath // + "/schemas" + try { + if (FileFactory.isFileExist(databasePath, fileType)) { + val file = FileFactory.getCarbonFile(databasePath, fileType) + val databaseFolders = file.listFiles() + + databaseFolders.foreach(databaseFolder => { + if (databaseFolder.isDirectory) { + val dbName = databaseFolder.getName + val tableFolders = databaseFolder.listFiles() + + tableFolders.foreach(tableFolder => { + if (tableFolder.isDirectory) { + val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName, + tableFolder.getName, UUID.randomUUID().toString) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath, + carbonTableIdentifier) + val tableMetadataFile = carbonTablePath.getSchemaFilePath + + if (FileFactory.isFileExist(tableMetadataFile, fileType)) { + val tableName = tableFolder.getName + val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName + val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile) + val schemaConverter = new ThriftWrapperSchemaConverterImpl + val wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath) + val schemaFilePath = CarbonStorePath + .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath + wrapperTableInfo.setStorePath(storePath) + wrapperTableInfo + .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath)) + CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) + val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) + metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath, + carbonTable) + } + } + }) + } + }) + } else { + // Create folders and files. + FileFactory.mkdirs(databasePath, fileType) + } + } catch { + case s: java.io.FileNotFoundException => + s.printStackTrace() + // Create folders and files. + FileFactory.mkdirs(databasePath, fileType) } } @@ -201,15 +238,15 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore { * @param newTableIdentifier * @param thriftTableInfo * @param schemaEvolutionEntry - * @param tablePath + * @param carbonStorePath * @param sparkSession */ def updateTableSchema(newTableIdentifier: CarbonTableIdentifier, oldTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, schemaEvolutionEntry: SchemaEvolutionEntry, - tablePath: String) (sparkSession: SparkSession): String = { - val absoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + carbonStorePath: String) + (sparkSession: SparkSession): String = { val schemaConverter = new ThriftWrapperSchemaConverterImpl if (schemaEvolutionEntry != null) { thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry) @@ -218,19 +255,11 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore { .fromExternalToWrapperTableInfo(thriftTableInfo, newTableIdentifier.getDatabaseName, newTableIdentifier.getTableName, - absoluteTableIdentifier.getStorePath) - val identifier = - new CarbonTableIdentifier(newTableIdentifier.getDatabaseName, - newTableIdentifier.getTableName, - wrapperTableInfo.getFactTable.getTableId) - val path = createSchemaThriftFile(wrapperTableInfo, + carbonStorePath) + createSchemaThriftFile(wrapperTableInfo, thriftTableInfo, - identifier) - addTableCache(wrapperTableInfo, - AbsoluteTableIdentifier.from(absoluteTableIdentifier.getStorePath, newTableIdentifier.getDatabaseName, - newTableIdentifier.getTableName)) - path + newTableIdentifier.getTableName)(sparkSession) } /** @@ -238,27 +267,25 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore { * * @param carbonTableIdentifier * @param thriftTableInfo - * @param tablePath + * @param carbonStorePath * @param sparkSession */ def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, - tablePath: String)(sparkSession: SparkSession): String = { - val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + carbonStorePath: String) + (sparkSession: SparkSession): String = { val schemaConverter = new ThriftWrapperSchemaConverterImpl val wrapperTableInfo = schemaConverter .fromExternalToWrapperTableInfo(thriftTableInfo, carbonTableIdentifier.getDatabaseName, carbonTableIdentifier.getTableName, - tableIdentifier.getStorePath) + carbonStorePath) val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history evolutionEntries.remove(evolutionEntries.size() - 1) - wrapperTableInfo.setStorePath(tableIdentifier.getStorePath) - val path = createSchemaThriftFile(wrapperTableInfo, + createSchemaThriftFile(wrapperTableInfo, thriftTableInfo, - tableIdentifier.getCarbonTableIdentifier) - addTableCache(wrapperTableInfo, tableIdentifier) - path + carbonTableIdentifier.getDatabaseName, + carbonTableIdentifier.getTableName)(sparkSession) } @@ -269,38 +296,24 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore { * Load CarbonTable from wrapper tableInfo * */ - def saveToDisk(tableInfo: schema.table.TableInfo, tablePath: String) { + def createTableFromThrift( + tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo, + dbName: String, tableName: String)(sparkSession: SparkSession): (String, String) = { + if (tableExists(tableName, Some(dbName))(sparkSession)) { + sys.error(s"Table [$tableName] already exists under Database [$dbName]") + } + val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime) val schemaConverter = new ThriftWrapperSchemaConverterImpl - val dbName = tableInfo.getDatabaseName - val tableName = tableInfo.getFactTable.getTableName val thriftTableInfo = schemaConverter .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName) - val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) - tableInfo.setStorePath(identifier.getStorePath) - createSchemaThriftFile(tableInfo, + thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history + .add(schemaEvolutionEntry) + val carbonTablePath = createSchemaThriftFile(tableInfo, thriftTableInfo, - identifier.getCarbonTableIdentifier) + dbName, + tableName)(sparkSession) LOGGER.info(s"Table $tableName for Database $dbName created successfully.") - } - - /** - * Generates schema string from TableInfo - */ - override def generateTableSchemaString(tableInfo: schema.table.TableInfo, - tablePath: String): String = { - val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier) - val schemaMetadataPath = - CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath) - tableInfo.setMetaDataFilepath(schemaMetadataPath) - tableInfo.setStorePath(tableIdentifier.getStorePath) - val schemaEvolutionEntry = new schema.SchemaEvolutionEntry - schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime) - tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry) - removeTableFromMetadata(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName) - CarbonMetadata.getInstance().loadTableMetadata(tableInfo) - addTableCache(tableInfo, tableIdentifier) - CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",") + (carbonTablePath, "") } /** @@ -308,16 +321,23 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore { * * @param tableInfo * @param thriftTableInfo + * @param dbName + * @param tableName + * @param sparkSession * @return */ - private def createSchemaThriftFile(tableInfo: schema.table.TableInfo, - thriftTableInfo: TableInfo, - carbonTableIdentifier: CarbonTableIdentifier): String = { - val carbonTablePath = CarbonStorePath. - getCarbonTablePath(tableInfo.getStorePath, carbonTableIdentifier) + private def createSchemaThriftFile( + tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo, + thriftTableInfo: org.apache.carbondata.format.TableInfo, + dbName: String, tableName: String) + (sparkSession: SparkSession): String = { + val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, + tableInfo.getFactTable.getTableId) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) val schemaFilePath = carbonTablePath.getSchemaFilePath val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath) tableInfo.setMetaDataFilepath(schemaMetadataPath) + tableInfo.setStorePath(storePath) val fileType = FileFactory.getFileType(schemaMetadataPath) if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) { FileFactory.mkdirs(schemaMetadataPath, fileType) @@ -326,20 +346,13 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore { thriftWriter.open(FileWriteOperation.OVERWRITE) thriftWriter.write(thriftTableInfo) thriftWriter.close() - updateSchemasUpdatedTime(touchSchemaFileSystemTime(tableInfo.getStorePath)) - carbonTablePath.getPath - } - - protected def addTableCache(tableInfo: table.TableInfo, - absoluteTableIdentifier: AbsoluteTableIdentifier) = { - val identifier = absoluteTableIdentifier.getCarbonTableIdentifier - CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName) - removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName) + removeTableFromMetadata(dbName, tableName) CarbonMetadata.getInstance().loadTableMetadata(tableInfo) - val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getStorePath, - absoluteTableIdentifier.getTablePath, - CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName)) + val tableMeta = new TableMeta(carbonTableIdentifier, storePath, + CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)) metadata.tablesMeta += tableMeta + updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName)) + carbonTablePath.getPath } /** @@ -349,15 +362,13 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore { * @param tableName */ def removeTableFromMetadata(dbName: String, tableName: String): Unit = { - val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadataCache(dbName, tableName) + val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, tableName) metadataToBeRemoved match { case Some(tableMeta) => metadata.tablesMeta -= tableMeta CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName) case None => - if (LOGGER.isDebugEnabled) { - LOGGER.debug(s"No entry for table $tableName in database $dbName") - } + LOGGER.debug(s"No entry for table $tableName in database $dbName") } } @@ -391,23 +402,23 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore { def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = { - try { - val tablePath = lookupRelation(tableIdentifier)(sparkSession). - asInstanceOf[CarbonRelation].tableMeta.tablePath - val fileType = FileFactory.getFileType(tablePath) - FileFactory.isFileExist(tablePath, fileType) - } catch { - case e: Exception => - false - } + val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + val tableName = tableIdentifier.table.toLowerCase + + val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath, + new CarbonTableIdentifier(dbName, tableName, "")).getPath + + val fileType = FileFactory.getFileType(tablePath) + FileFactory.isFileExist(tablePath, fileType) } - def dropTable(tablePath: String, tableIdentifier: TableIdentifier) + def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier) (sparkSession: SparkSession) { val dbName = tableIdentifier.database.get val tableName = tableIdentifier.table - val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) - val metadataFilePath = CarbonStorePath.getCarbonTablePath(identifier).getMetadataDirectoryPath + + val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath, + new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) if (null != carbonTable) { // clear driver B-tree and dictionary cache @@ -418,18 +429,27 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore { if (FileFactory.isFileExist(metadataFilePath, fileType)) { // while drop we should refresh the schema modified time so that if any thing has changed // in the other beeline need to update. - checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath) - - removeTableFromMetadata(dbName, tableName) - updateSchemasUpdatedTime(touchSchemaFileSystemTime(identifier.getStorePath)) + checkSchemasModifiedTimeAndReloadTables + val file = FileFactory.getCarbonFile(metadataFilePath, fileType) + CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile) + val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, + tableIdentifier.table) + metadataToBeRemoved match { + case Some(tableMeta) => + metadata.tablesMeta -= tableMeta + CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName) + updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName)) + case None => + LOGGER.info(s"Metadata does not contain entry for table $tableName in database $dbName") + } CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) // discard cached table info in cachedDataSourceTables sparkSession.sessionState.catalog.refreshTable(tableIdentifier) } } - private def getTimestampFileAndType(basePath: String) = { - val timestampFile = basePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE + private def getTimestampFileAndType(databaseName: String, tableName: String) = { + val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE val timestampFileType = FileFactory.getFileType(timestampFile) (timestampFile, timestampFileType) } @@ -443,20 +463,37 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore { tableModifiedTimeStore.put("default", timeStamp) } - def updateAndTouchSchemasUpdatedTime(basePath: String) { - updateSchemasUpdatedTime(touchSchemaFileSystemTime(basePath)) + def updateAndTouchSchemasUpdatedTime(databaseName: String, tableName: String) { + updateSchemasUpdatedTime(touchSchemaFileSystemTime(databaseName, tableName)) } + /** + * This method will read the timestamp of empty schema file + * + * @param databaseName + * @param tableName + * @return + */ + private def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = { + val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName) + if (FileFactory.isFileExist(timestampFile, timestampFileType)) { + FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime + } else { + System.currentTimeMillis() + } + } /** * This method will check and create an empty schema timestamp file * + * @param databaseName + * @param tableName * @return */ - private def touchSchemaFileSystemTime(basePath: String): Long = { - val (timestampFile, timestampFileType) = getTimestampFileAndType(basePath) + private def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = { + val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName) if (!FileFactory.isFileExist(timestampFile, timestampFileType)) { - LOGGER.audit(s"Creating timestamp file for $basePath") + LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName") FileFactory.createNewFile(timestampFile, timestampFileType) } val systemTime = System.currentTimeMillis() @@ -465,9 +502,8 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore { systemTime } - def checkSchemasModifiedTimeAndReloadTables(storePath: String) { - val (timestampFile, timestampFileType) = - getTimestampFileAndType(storePath) + def checkSchemasModifiedTimeAndReloadTables() { + val (timestampFile, timestampFileType) = getTimestampFileAndType("", "") if (FileFactory.isFileExist(timestampFile, timestampFileType)) { if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType). getLastModifiedTime == @@ -478,7 +514,7 @@ class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore { } private def refreshCache() { - metadata.tablesMeta.clear() + metadata.tablesMeta = loadMetadata(storePath, nextQueryId).tablesMeta } override def isReadFromHiveMetaStore: Boolean = false
