Repository: carbondata Updated Branches: refs/heads/metadata 2a9debfcc -> c7f0b1081
[CARBONDATA-1301] change command to update schema and data separately This closes #1160 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c7f0b108 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c7f0b108 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c7f0b108 Branch: refs/heads/metadata Commit: c7f0b1081c6055970458712bf29d874fe95c7004 Parents: 2a9debf Author: jackylk <[email protected]> Authored: Wed Jul 12 00:40:44 2017 +0800 Committer: Ravindra Pesala <[email protected]> Committed: Wed Jul 12 21:07:14 2017 +0530 ---------------------------------------------------------------------- .../sql/execution/command/IUDCommands.scala | 146 +++---- .../execution/command/carbonTableSchema.scala | 414 ++++++++++--------- .../spark/sql/hive/CarbonFileMetastore.scala | 3 +- .../spark/sql/hive/CarbonHiveMetaStore.scala | 11 - 4 files changed, 296 insertions(+), 278 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/c7f0b108/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala index 2c1de52..d3a80d4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala @@ -58,31 +58,35 @@ import org.apache.carbondata.spark.util.QueryPlanUtil private[sql] case class ProjectForDeleteCommand( plan: LogicalPlan, identifier: Seq[String], - timestamp: String) extends RunnableCommand { + timestamp: String) extends RunnableCommand with DataProcessCommand { - val LOG = LogServiceFactory.getLogService(this.getClass.getName) var horizontalCompactionFailed = false override def run(sparkSession: SparkSession): Seq[Row] = { + processData(sparkSession) + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val dataFrame = Dataset.ofRows(sparkSession, plan) -// dataFrame.show(truncate = false) -// dataFrame.collect().foreach(println) + // dataFrame.show(truncate = false) + // dataFrame.collect().foreach(println) val dataRdd = dataFrame.rdd val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(deleteExecution.getTableIdentifier(identifier))(sparkSession). - asInstanceOf[CarbonRelation] + .lookupRelation(deleteExecution.getTableIdentifier(identifier))(sparkSession). + asInstanceOf[CarbonRelation] val carbonTable = relation.tableMeta.carbonTable val metadataLock = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, - LockUsage.METADATA_LOCK) + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + LockUsage.METADATA_LOCK) var lockStatus = false try { lockStatus = metadataLock.lockWithRetries() - LOG.audit(s" Delete data request has been received " + - s"for ${ relation.databaseName }.${ relation.tableName }.") + LOGGER.audit(s" Delete data request has been received " + + s"for ${ relation.databaseName }.${ relation.tableName }.") if (lockStatus) { - LOG.info("Successfully able to get the table metadata file lock") + LOGGER.info("Successfully able to get the table metadata file lock") } else { throw new Exception("Table is locked for deletion. Please try after some time") @@ -92,23 +96,23 @@ private[sql] case class ProjectForDeleteCommand( carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier) var executorErrors = new ExecutionErrors(FailureCauses.NONE, "") - // handle the clean up of IUD. - CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false) + // handle the clean up of IUD. + CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false) - if (deleteExecution - .deleteDeltaExecution(identifier, sparkSession, dataRdd, timestamp, relation, - false, executorErrors)) { - // call IUD Compaction. - IUDCommon.tryHorizontalCompaction(sparkSession, relation, isUpdateOperation = false) - } + if (deleteExecution + .deleteDeltaExecution(identifier, sparkSession, dataRdd, timestamp, relation, + false, executorErrors)) { + // call IUD Compaction. + IUDCommon.tryHorizontalCompaction(sparkSession, relation, isUpdateOperation = false) + } } catch { case e: HorizontalCompactionException => - LOG.error("Delete operation passed. Exception in Horizontal Compaction." + - " Please check logs. " + e.getMessage) - CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString) + LOGGER.error("Delete operation passed. Exception in Horizontal Compaction." + + " Please check logs. " + e.getMessage) + CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString) case e: Exception => - LOG.error(e, "Exception in Delete data operation " + e.getMessage) + LOGGER.error(e, "Exception in Delete data operation " + e.getMessage) // ****** start clean up. // In case of failure , clean all related delete delta files CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) @@ -130,47 +134,51 @@ private[sql] case class ProjectForDeleteCommand( } private[sql] case class ProjectForUpdateCommand( - plan: LogicalPlan, tableIdentifier: Seq[String]) extends RunnableCommand { - val LOGGER = LogServiceFactory.getLogService(ProjectForUpdateCommand.getClass.getName) + plan: LogicalPlan, tableIdentifier: Seq[String]) extends RunnableCommand + with DataProcessCommand { override def run(sparkSession: SparkSession): Seq[Row] = { + processData(sparkSession) + } + override def processData(sparkSession: SparkSession): Seq[Row] = { + val LOGGER = LogServiceFactory.getLogService(ProjectForUpdateCommand.getClass.getName) - // sqlContext.sparkContext.setLocalProperty(org.apache.spark.sql.execution.SQLExecution + // sqlContext.sparkContext.setLocalProperty(org.apache.spark.sql.execution.SQLExecution // .EXECUTION_ID_KEY, null) // DataFrame(sqlContext, plan).show(truncate = false) // return Seq.empty val res = plan find { - case relation: LogicalRelation if (relation.relation - .isInstanceOf[CarbonDatasourceHadoopRelation]) => + case relation: LogicalRelation if relation.relation + .isInstanceOf[CarbonDatasourceHadoopRelation] => true case _ => false } - if (!res.isDefined) { + if (res.isEmpty) { return Seq.empty } val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(deleteExecution.getTableIdentifier(tableIdentifier))(sparkSession). - asInstanceOf[CarbonRelation] -// val relation = CarbonEnv.get.carbonMetastore -// .lookupRelation1(deleteExecution.getTableIdentifier(tableIdentifier))(sqlContext). -// asInstanceOf[CarbonRelation] + .lookupRelation(deleteExecution.getTableIdentifier(tableIdentifier))(sparkSession). + asInstanceOf[CarbonRelation] + // val relation = CarbonEnv.get.carbonMetastore + // .lookupRelation1(deleteExecution.getTableIdentifier(tableIdentifier))(sqlContext). + // asInstanceOf[CarbonRelation] val carbonTable = relation.tableMeta.carbonTable val metadataLock = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, - LockUsage.METADATA_LOCK) + .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + LockUsage.METADATA_LOCK) var lockStatus = false // get the current time stamp which should be same for delete and update. val currentTime = CarbonUpdateUtil.readCurrentTime -// var dataFrame: DataFrame = null + // var dataFrame: DataFrame = null var dataSet: DataFrame = null val isPersistEnabledUserValue = CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.isPersistEnabled, - CarbonCommonConstants.defaultValueIsPersistEnabled) - var isPersistEnabled = CarbonCommonConstants.defaultValueIsPersistEnabled.toBoolean + .getProperty(CarbonCommonConstants.isPersistEnabled, + CarbonCommonConstants.defaultValueIsPersistEnabled) + var isPersistEnabled = CarbonCommonConstants.defaultValueIsPersistEnabled.toBoolean if (isPersistEnabledUserValue.equalsIgnoreCase("false")) { isPersistEnabled = false } @@ -188,48 +196,46 @@ private[sql] case class ProjectForUpdateCommand( val tablePath = CarbonStorePath.getCarbonTablePath( carbonTable.getStorePath, carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier) - // Get RDD. + // Get RDD. dataSet = if (isPersistEnabled) { - Dataset.ofRows(sparkSession, plan).persist(StorageLevel.MEMORY_AND_DISK) -// DataFrame(sqlContext, plan) -// .persist(StorageLevel.MEMORY_AND_DISK) - } - else { - Dataset.ofRows(sparkSession, plan) -// DataFrame(sqlContext, plan) - } - var executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + Dataset.ofRows(sparkSession, plan).persist(StorageLevel.MEMORY_AND_DISK) + // DataFrame(sqlContext, plan) + // .persist(StorageLevel.MEMORY_AND_DISK) + } + else { + Dataset.ofRows(sparkSession, plan) + // DataFrame(sqlContext, plan) + } + var executionErrors = new ExecutionErrors(FailureCauses.NONE, "") - // handle the clean up of IUD. - CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false) + // handle the clean up of IUD. + CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false) - // do delete operation. - deleteExecution.deleteDeltaExecution(tableIdentifier, sparkSession, dataSet.rdd, - currentTime + "", + // do delete operation. + deleteExecution.deleteDeltaExecution(tableIdentifier, sparkSession, dataSet.rdd, + currentTime + "", relation, isUpdateOperation = true, executionErrors) - if(executionErrors.failureCauses != FailureCauses.NONE) { - throw new Exception(executionErrors.errorMsg) - } - - // do update operation. - UpdateExecution.performUpdate(dataSet, tableIdentifier, plan, - sparkSession, currentTime, executionErrors) + if(executionErrors.failureCauses != FailureCauses.NONE) { + throw new Exception(executionErrors.errorMsg) + } - if(executionErrors.failureCauses != FailureCauses.NONE) { - throw new Exception(executionErrors.errorMsg) - } + // do update operation. + UpdateExecution.performUpdate(dataSet, tableIdentifier, plan, + sparkSession, currentTime, executionErrors) - // Do IUD Compaction. - IUDCommon.tryHorizontalCompaction(sparkSession, relation, isUpdateOperation = true) - } + if(executionErrors.failureCauses != FailureCauses.NONE) { + throw new Exception(executionErrors.errorMsg) + } - catch { + // Do IUD Compaction. + IUDCommon.tryHorizontalCompaction(sparkSession, relation, isUpdateOperation = true) + } catch { case e: HorizontalCompactionException => LOGGER.error( - "Update operation passed. Exception in Horizontal Compaction. Please check logs." + e) + "Update operation passed. Exception in Horizontal Compaction. Please check logs." + e) // In case of failure , clean all related delta files CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString) http://git-wip-us.apache.org/repos/asf/carbondata/blob/c7f0b108/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 88e89ad..cc18fa3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -72,18 +72,36 @@ object Checker { } /** + * Interface for command that modifies schema + */ +trait SchemaProcessCommand { + def processSchema(sparkSession: SparkSession): Seq[Row] +} + +/** + * Interface for command that need to process data in file system + */ +trait DataProcessCommand { + def processData(sparkSession: SparkSession): Seq[Row] +} + +/** * Command for show table partitions Command * * @param tableIdentifier */ private[sql] case class ShowCarbonPartitionsCommand( - tableIdentifier: TableIdentifier) extends RunnableCommand { - val LOGGER = LogServiceFactory.getLogService(ShowCarbonPartitionsCommand.getClass.getName) + tableIdentifier: TableIdentifier) extends RunnableCommand with SchemaProcessCommand { + override val output = CommonUtil.partitionInfoOutput override def run(sparkSession: SparkSession): Seq[Row] = { + processSchema(sparkSession) + } + + override def processSchema(sparkSession: SparkSession): Seq[Row] = { val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(tableIdentifier)(sparkSession). - asInstanceOf[CarbonRelation] + .lookupRelation(tableIdentifier)(sparkSession). + asInstanceOf[CarbonRelation] val carbonTable = relation.tableMeta.carbonTable var tableName = carbonTable.getFactTableName var partitionInfo = carbonTable.getPartitionInfo( @@ -94,6 +112,7 @@ private[sql] case class ShowCarbonPartitionsCommand( } var partitionType = partitionInfo.getPartitionType var columnName = partitionInfo.getColumnSchemaList.get(0).getColumnName + val LOGGER = LogServiceFactory.getLogService(ShowCarbonPartitionsCommand.getClass.getName) LOGGER.info("partition column name:" + columnName) CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo) } @@ -104,17 +123,22 @@ private[sql] case class ShowCarbonPartitionsCommand( * * @param alterTableModel */ -case class AlterTableCompaction(alterTableModel: AlterTableModel) extends RunnableCommand { +case class AlterTableCompaction(alterTableModel: AlterTableModel) extends RunnableCommand + with DataProcessCommand { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) def run(sparkSession: SparkSession): Seq[Row] = { + processData(sparkSession) + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { val tableName = alterTableModel.tableName.toLowerCase val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase) val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(Option(databaseName), tableName)(sparkSession) - .asInstanceOf[CarbonRelation] + .lookupRelation(Option(databaseName), tableName)(sparkSession) + .asInstanceOf[CarbonRelation] if (relation == null) { sys.error(s"Table $databaseName.$tableName does not exist") } @@ -135,18 +159,18 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab carbonLoadModel.setStorePath(relation.tableMeta.storePath) var storeLocation = CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH, - System.getProperty("java.io.tmpdir") - ) + .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH, + System.getProperty("java.io.tmpdir") + ) storeLocation = storeLocation + "/carbonstore/" + System.nanoTime() try { CarbonDataRDDFactory - .alterTableForCompaction(sparkSession.sqlContext, - alterTableModel, - carbonLoadModel, - relation.tableMeta.storePath, - storeLocation - ) + .alterTableForCompaction(sparkSession.sqlContext, + alterTableModel, + carbonLoadModel, + relation.tableMeta.storePath, + storeLocation + ) } catch { case e: Exception => if (null != e.getMessage) { @@ -159,9 +183,19 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab } } -case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends RunnableCommand { +case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends RunnableCommand + with SchemaProcessCommand { def run(sparkSession: SparkSession): Seq[Row] = { + processSchema(sparkSession) + } + + def setV(ref: Any, name: String, value: Any): Unit = { + ref.getClass.getFields.find(_.getName == name).get + .set(ref, value.asInstanceOf[AnyRef]) + } + + override def processSchema(sparkSession: SparkSession): Seq[Row] = { CarbonEnv.getInstance(sparkSession).carbonMetastore.checkSchemasModifiedTimeAndReloadTables() val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) cm.databaseName = getDB.getDatabaseName(cm.databaseNameOp, sparkSession) @@ -176,11 +210,11 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru } if (sparkSession.sessionState.catalog.listTables(dbName) - .exists(_.table.equalsIgnoreCase(tbName))) { + .exists(_.table.equalsIgnoreCase(tbName))) { if (!cm.ifNotExistsSet) { LOGGER.audit( s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " + - s"Table [$tbName] already exists under database [$dbName]") + s"Table [$tbName] already exists under database [$dbName]") sys.error(s"Table [$tbName] already exists under database [$dbName]") } } else { @@ -196,48 +230,43 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru sparkSession.sql( s"""CREATE TABLE $dbName.$tbName - |(${ fields.map(f => f.rawSchema).mkString(",") }) - |USING org.apache.spark.sql.CarbonSource""".stripMargin + - s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin + - s""""$tablePath" $carbonSchemaString) """) + |(${ fields.map(f => f.rawSchema).mkString(",") }) + |USING org.apache.spark.sql.CarbonSource""".stripMargin + + s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin + + s""""$tablePath" $carbonSchemaString) """) } catch { case e: Exception => val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName)) // call the drop table to delete the created table. CarbonEnv.getInstance(sparkSession).carbonMetastore - .dropTable(catalog.storePath, identifier)(sparkSession) + .dropTable(catalog.storePath, identifier)(sparkSession) LOGGER.audit(s"Table creation with Database name [$dbName] " + - s"and Table name [$tbName] failed") + s"and Table name [$tbName] failed") throw e } } LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]") } - Seq.empty } - - def setV(ref: Any, name: String, value: Any): Unit = { - ref.getClass.getFields.find(_.getName == name).get - .set(ref, value.asInstanceOf[AnyRef]) - } } case class DeleteLoadsById( loadids: Seq[String], databaseNameOp: Option[String], - tableName: String) extends RunnableCommand { - - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + tableName: String) extends RunnableCommand with DataProcessCommand { def run(sparkSession: SparkSession): Seq[Row] = { + processData(sparkSession) + } + override def processData(sparkSession: SparkSession): Seq[Row] = { Checker.validateTableExists(databaseNameOp, tableName, sparkSession) val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. - lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. - tableMeta.carbonTable + lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. + tableMeta.carbonTable CarbonStore.deleteLoadById( loadids, getDB.getDatabaseName(databaseNameOp, sparkSession), @@ -245,24 +274,24 @@ case class DeleteLoadsById( carbonTable ) Seq.empty - } - } case class DeleteLoadsByLoadDate( databaseNameOp: Option[String], tableName: String, dateField: String, - loadDate: String) extends RunnableCommand { - - val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.TableModel.tableSchema") + loadDate: String) extends RunnableCommand with DataProcessCommand { def run(sparkSession: SparkSession): Seq[Row] = { + processData(sparkSession) + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { Checker.validateTableExists(databaseNameOp, tableName, sparkSession) val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. - lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. - tableMeta.carbonTable + lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. + tableMeta.carbonTable CarbonStore.deleteLoadByDate( loadDate, getDB.getDatabaseName(databaseNameOp, sparkSession), @@ -271,7 +300,6 @@ case class DeleteLoadsByLoadDate( ) Seq.empty } - } object LoadTable { @@ -318,10 +346,13 @@ object LoadTable { } case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, child: LogicalPlan) - extends RunnableCommand { - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + extends RunnableCommand with DataProcessCommand { def run(sparkSession: SparkSession): Seq[Row] = { + processData(sparkSession) + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { val df = Dataset.ofRows(sparkSession, child) val header = relation.tableSchema.get.fields.map(_.name).mkString(",") val load = LoadTable( @@ -335,7 +366,7 @@ case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, child: Lo Some(df)).run(sparkSession) // updating relation metadata. This is in case of auto detect high cardinality relation.carbonRelation.metaData = - CarbonSparkUtil.createSparkMeta(relation.carbonRelation.tableMeta.carbonTable) + CarbonSparkUtil.createSparkMeta(relation.carbonRelation.tableMeta.carbonTable) load } } @@ -349,12 +380,10 @@ case class LoadTable( isOverwriteExist: Boolean = false, var inputSqlString: String = null, dataFrame: Option[DataFrame] = None, - updateModel: Option[UpdateTableModel] = None) extends RunnableCommand { - - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + updateModel: Option[UpdateTableModel] = None) extends RunnableCommand with DataProcessCommand { - private def getFinalOptions(carbonProperty: CarbonProperties): scala.collection - .mutable.Map[String, String] = { + private def getFinalOptions(carbonProperty: CarbonProperties): + scala.collection.mutable.Map[String, String] = { var optionsFinal = scala.collection.mutable.Map[String, String]() optionsFinal.put("delimiter", options.getOrElse("delimiter", ",")) optionsFinal.put("quotechar", options.getOrElse("quotechar", "\"")) @@ -415,14 +444,15 @@ case class LoadTable( case "false" => // when single_pass = false and if either alldictionarypath // or columnDict is configured the do not allow load - if (StringUtils.isNotEmpty(optionsFinal.get("all_dictionary_path").get) || - StringUtils.isNotEmpty(optionsFinal.get("columndict").get)) { + if (StringUtils.isNotEmpty(optionsFinal("all_dictionary_path")) || + StringUtils.isNotEmpty(optionsFinal("columndict"))) { throw new MalformedCarbonCommandException( "Can not use all_dictionary_path or columndict without single_pass.") } else { false } case illegal => + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) LOGGER.error(s"Can't use single_pass, because illegal syntax found: [" + illegal + "] " + "Please set it as 'true' or 'false'") false @@ -440,7 +470,12 @@ case class LoadTable( } def run(sparkSession: SparkSession): Seq[Row] = { - if (dataFrame.isDefined && !updateModel.isDefined) { + processData(sparkSession) + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + if (dataFrame.isDefined && updateModel.isEmpty) { val rdd = dataFrame.get.rdd if (rdd.partitions == null || rdd.partitions.length == 0) { LOGGER.warn("DataLoading finished. No data was loaded.") @@ -452,8 +487,9 @@ case class LoadTable( if (isOverwriteExist) { sys.error(s"Overwrite is not supported for carbon table with $dbName.$tableName") } + val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] + .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] if (relation == null) { sys.error(s"Table $dbName.$tableName does not exist") } @@ -467,10 +503,10 @@ case class LoadTable( carbonProperty.addProperty("zookeeper.enable.lock", "false") val optionsFinal = getFinalOptions(carbonProperty) val carbonLock = CarbonLockFactory - .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier - .getCarbonTableIdentifier, - LockUsage.METADATA_LOCK - ) + .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier + .getCarbonTableIdentifier, + LockUsage.METADATA_LOCK + ) try { // take lock only in case of normal data load. if (updateModel.isEmpty) { @@ -499,21 +535,21 @@ case class LoadTable( carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) val partitionLocation = relation.tableMeta.storePath + "/partition/" + - relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" + - relation.tableMeta.carbonTableIdentifier.getTableName + "/" + relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" + + relation.tableMeta.carbonTableIdentifier.getTableName + "/" val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean - val sort_scope = optionsFinal.get("sort_scope").get - val single_pass = optionsFinal.get("single_pass").get - val bad_records_logger_enable = optionsFinal.get("bad_records_logger_enable").get - val bad_records_action = optionsFinal.get("bad_records_action").get - val bad_record_path = optionsFinal.get("bad_record_path").get - val global_sort_partitions = optionsFinal.get("global_sort_partitions").get - val dateFormat = optionsFinal.get("dateformat").get - val delimeter = optionsFinal.get("delimiter").get - val complex_delimeter_level1 = optionsFinal.get("complex_delimiter_level_1").get - val complex_delimeter_level2 = optionsFinal.get("complex_delimiter_level_2").get - val all_dictionary_path = optionsFinal.get("all_dictionary_path").get - val column_dict = optionsFinal.get("columndict").get + val sort_scope = optionsFinal("sort_scope") + val single_pass = optionsFinal("single_pass") + val bad_records_logger_enable = optionsFinal("bad_records_logger_enable") + val bad_records_action = optionsFinal("bad_records_action") + val bad_record_path = optionsFinal("bad_record_path") + val global_sort_partitions = optionsFinal("global_sort_partitions") + val dateFormat = optionsFinal("dateformat") + val delimeter = optionsFinal("delimiter") + val complex_delimeter_level1 = optionsFinal("complex_delimiter_level_1") + val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2") + val all_dictionary_path = optionsFinal("all_dictionary_path") + val column_dict = optionsFinal("columndict") if (sort_scope.equals("GLOBAL_SORT") && single_pass.equals("TRUE")) { sys.error("Global_Sort can't be used with single_pass flow") @@ -531,13 +567,13 @@ case class LoadTable( carbonLoadModel.setBadRecordsLocation(bad_record_path) ValidateUtil.validateGlobalSortPartitions(global_sort_partitions) - carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal.get("escapechar").get, "\\")) - carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal.get("quotechar").get, "\"")) - carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal.get("commentchar").get, "#")) + carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal("escapechar"), "\\")) + carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal("quotechar"), "\"")) + carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal("commentchar"), "#")) // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option, // we should use table schema to generate file header. - var fileHeader = optionsFinal.get("fileheader").get + var fileHeader = optionsFinal("fileheader") val headerOption = options.get("header") if (headerOption.isDefined) { // whether the csv file has file header @@ -559,7 +595,7 @@ case class LoadTable( // generate file header if (fileHeader.isEmpty) { fileHeader = table.getCreateOrderColumn(table.getFactTableName) - .asScala.map(_.getColName).mkString(",") + .asScala.map(_.getColName).mkString(",") } } } @@ -572,21 +608,21 @@ case class LoadTable( CarbonCommonConstants.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) carbonLoadModel - .setSerializationNullFormat( - TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," + - optionsFinal.get("serialization_null_format").get) + .setSerializationNullFormat( + TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," + + optionsFinal("serialization_null_format")) carbonLoadModel - .setBadRecordsLoggerEnable( - TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable) + .setBadRecordsLoggerEnable( + TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable) carbonLoadModel - .setBadRecordsAction( - TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action) + .setBadRecordsAction( + TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action) carbonLoadModel - .setIsEmptyDataBadRecord( - DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + - optionsFinal.get("is_empty_data_bad_record").get) + .setIsEmptyDataBadRecord( + DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + + optionsFinal("is_empty_data_bad_record")) carbonLoadModel.setSortScope(sort_scope) - carbonLoadModel.setBatchSortSizeInMb(optionsFinal.get("batch_sort_size_inmb").get) + carbonLoadModel.setBatchSortSizeInMb(optionsFinal("batch_sort_size_inmb")) carbonLoadModel.setGlobalSortPartitions(global_sort_partitions) carbonLoadModel.setUseOnePass(single_pass.toBoolean) if (delimeter.equalsIgnoreCase(complex_delimeter_level1) || @@ -614,7 +650,7 @@ case class LoadTable( carbonLoadModel.setDirectLoad(true) carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel)) val validatedMaxColumns = CommonUtil.validateMaxColumns(carbonLoadModel.getCsvHeaderColumns, - optionsFinal.get("maxcolumns").get) + optionsFinal("maxcolumns")) carbonLoadModel.setMaxColumns(validatedMaxColumns.toString) GlobalDictionaryUtil.updateTableMetadataFunc = LoadTable.updateTableMetadata val storePath = relation.tableMeta.storePath @@ -629,7 +665,7 @@ case class LoadTable( } // Create table and metadata folders if not exist val carbonTablePath = CarbonStorePath - .getCarbonTablePath(storePath, table.getCarbonTableIdentifier) + .getCarbonTablePath(storePath, table.getCarbonTableIdentifier) val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath val fileType = FileFactory.getFileType(metadataDirectoryPath) if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { @@ -638,9 +674,9 @@ case class LoadTable( if (carbonLoadModel.getUseOnePass) { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier - .getCarbonTableIdentifier + .getCarbonTableIdentifier val carbonTablePath = CarbonStorePath - .getCarbonTablePath(storePath, carbonTableIdentifier) + .getCarbonTablePath(storePath, carbonTableIdentifier) val dictFolderPath = carbonTablePath.getMetadataDirectoryPath val dimensions = carbonTable.getDimensionByTableName( carbonTable.getFactTableName).asScala.toArray @@ -649,37 +685,37 @@ case class LoadTable( carbonLoadModel.initPredefDictMap() // generate predefined dictionary GlobalDictionaryUtil - .generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier, - dimensions, carbonLoadModel, sparkSession.sqlContext, storePath, dictFolderPath) + .generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier, + dimensions, carbonLoadModel, sparkSession.sqlContext, storePath, dictFolderPath) } if (!StringUtils.isEmpty(all_dictionary_path)) { carbonLoadModel.initPredefDictMap() GlobalDictionaryUtil - .generateDictionaryFromDictionaryFiles(sparkSession.sqlContext, - carbonLoadModel, - storePath, - carbonTableIdentifier, - dictFolderPath, - dimensions, - all_dictionary_path) + .generateDictionaryFromDictionaryFiles(sparkSession.sqlContext, + carbonLoadModel, + storePath, + carbonTableIdentifier, + dictFolderPath, + dimensions, + all_dictionary_path) } // dictionaryServerClient dictionary generator val dictionaryServerPort = carbonProperty - .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT, - CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT) + .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT, + CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT) val sparkDriverHost = sparkSession.sqlContext.sparkContext. - getConf.get("spark.driver.host") + getConf.get("spark.driver.host") carbonLoadModel.setDictionaryServerHost(sparkDriverHost) // start dictionary server when use one pass load and dimension with DICTIONARY // encoding is present. val allDimensions = table.getAllDimensions.asScala.toList val createDictionary = allDimensions.exists { carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) && - !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) + !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) } val server: Option[DictionaryServer] = if (createDictionary) { val dictionaryServer = DictionaryServer - .getInstance(dictionaryServerPort.toInt) + .getInstance(dictionaryServerPort.toInt) carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort) sparkSession.sparkContext.addSparkListener(new SparkListener() { override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { @@ -707,22 +743,13 @@ case class LoadTable( CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID)) // getting all fields except tupleId field as it is not required in the value var otherFields = fields.toSeq - .filter(field => !field.name - .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) - .map(field => { - if (field.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION) && false) { - new Column(field.name - .substring(0, - field.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION))) - } else { - - new Column(field.name) - } - }) + .filter(field => !field.name + .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) + .map(field => new Column(field.name)) // extract tupleId field which will be used as a key val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute - .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).as("segId") + .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).as("segId") // use dataFrameWithoutTupleId as dictionaryDataFrame val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*) otherFields = otherFields :+ segIdColumn @@ -733,12 +760,11 @@ case class LoadTable( (dataFrame, dataFrame) } - GlobalDictionaryUtil - .generateGlobalDictionary( - sparkSession.sqlContext, - carbonLoadModel, - relation.tableMeta.storePath, - dictionaryDataFrame) + GlobalDictionaryUtil.generateGlobalDictionary( + sparkSession.sqlContext, + carbonLoadModel, + relation.tableMeta.storePath, + dictionaryDataFrame) CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext, carbonLoadModel, relation.tableMeta.storePath, @@ -759,14 +785,14 @@ case class LoadTable( val fileType = FileFactory.getFileType(partitionLocation) if (FileFactory.isFileExist(partitionLocation, fileType)) { val file = FileFactory - .getCarbonFile(partitionLocation, fileType) + .getCarbonFile(partitionLocation, fileType) CarbonUtil.deleteFoldersAndFiles(file) } } catch { case ex: Exception => LOGGER.error(ex) LOGGER.audit(s"Dataload failure for $dbName.$tableName. " + - "Problem deleting the partition folder") + "Problem deleting the partition folder") throw ex } @@ -791,40 +817,19 @@ case class LoadTable( } } -private[sql] case class DeleteLoadByDate( +case class CleanFiles( databaseNameOp: Option[String], - tableName: String, - dateField: String, - loadDate: String) { - - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + tableName: String) extends RunnableCommand with DataProcessCommand { def run(sparkSession: SparkSession): Seq[Row] = { - Checker.validateTableExists(databaseNameOp, tableName, sparkSession) - val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. - lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. - tableMeta.carbonTable - CarbonStore.deleteLoadByDate( - loadDate, - getDB.getDatabaseName(databaseNameOp, sparkSession), - tableName, - carbonTable - ) - Seq.empty + processData(sparkSession) } -} - -case class CleanFiles( - databaseNameOp: Option[String], - tableName: String) extends RunnableCommand { - - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - def run(sparkSession: SparkSession): Seq[Row] = { + override def processData(sparkSession: SparkSession): Seq[Row] = { Checker.validateTableExists(databaseNameOp, tableName, sparkSession) val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore val relation = catalog - .lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation] + .lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation] val carbonTable = relation.tableMeta.carbonTable CarbonStore.cleanFiles( getDB.getDatabaseName(databaseNameOp, sparkSession), @@ -840,13 +845,17 @@ case class ShowLoads( databaseNameOp: Option[String], tableName: String, limit: Option[String], - override val output: Seq[Attribute]) extends RunnableCommand { + override val output: Seq[Attribute]) extends RunnableCommand with DataProcessCommand { def run(sparkSession: SparkSession): Seq[Row] = { + processData(sparkSession) + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { Checker.validateTableExists(databaseNameOp, tableName, sparkSession) val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. - lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. - tableMeta.carbonTable + lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. + tableMeta.carbonTable CarbonStore.showSegments( getDB.getDatabaseName(databaseNameOp, sparkSession), tableName, @@ -859,16 +868,20 @@ case class ShowLoads( case class CarbonDropTableCommand(ifExistsSet: Boolean, databaseNameOp: Option[String], tableName: String) - extends RunnableCommand { + extends RunnableCommand with SchemaProcessCommand with DataProcessCommand { def run(sparkSession: SparkSession): Seq[Row] = { + processSchema(sparkSession) + processData(sparkSession) + } + + override def processSchema(sparkSession: SparkSession): Seq[Row] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val dbName = getDB.getDatabaseName(databaseNameOp, sparkSession) val identifier = TableIdentifier(tableName, Option(dbName)) val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "") val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK) val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore - val storePath = catalog.storePath catalog.checkSchemasModifiedTimeAndReloadTables() val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer() try { @@ -878,7 +891,7 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean, LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]") CarbonEnv.getInstance(sparkSession).carbonMetastore - .dropTable(storePath, identifier)(sparkSession) + .dropTable(catalog.storePath, identifier)(sparkSession) LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]") } catch { case ex: Exception => @@ -889,29 +902,57 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean, if (unlocked) { logInfo("Table MetaData Unlocked Successfully") } - // deleting any remaining files. - val metadataFilePath = CarbonStorePath - .getCarbonTablePath(storePath, carbonTableIdentifier).getMetadataDirectoryPath - val fileType = FileFactory.getFileType(metadataFilePath) - if (FileFactory.isFileExist(metadataFilePath, fileType)) { - val file = FileFactory.getCarbonFile(metadataFilePath, fileType) - CarbonUtil.deleteFoldersAndFiles(file.getParentFile) - } } } Seq.empty } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + // delete the table folder + val dbName = getDB.getDatabaseName(databaseNameOp, sparkSession) + val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "") + val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore + val metadataFilePath = CarbonStorePath + .getCarbonTablePath(catalog.storePath, carbonTableIdentifier).getMetadataDirectoryPath + val fileType = FileFactory.getFileType(metadataFilePath) + if (FileFactory.isFileExist(metadataFilePath, fileType)) { + val file = FileFactory.getCarbonFile(metadataFilePath, fileType) + CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile) + } + Seq.empty + } } private[sql] case class DescribeCommandFormatted( child: SparkPlan, override val output: Seq[Attribute], tblIdentifier: TableIdentifier) - extends RunnableCommand { + extends RunnableCommand with SchemaProcessCommand { override def run(sparkSession: SparkSession): Seq[Row] = { + processSchema(sparkSession) + } + + private def getColumnGroups(dimensions: List[CarbonDimension]): Seq[(String, String, String)] = { + var results: Seq[(String, String, String)] = + Seq(("", "", ""), ("##Column Group Information", "", "")) + val groupedDimensions = dimensions.groupBy(x => x.columnGroupId()).filter { + case (groupId, _) => groupId != -1 + }.toSeq.sortBy(_._1) + val groups = groupedDimensions.map(colGroups => { + colGroups._2.map(dim => dim.getColName).mkString(", ") + }) + var index = 1 + groups.foreach { x => + results = results :+ (s"Column Group $index", x, "") + index = index + 1 + } + results + } + + override def processSchema(sparkSession: SparkSession): Seq[Row] = { val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(tblIdentifier)(sparkSession).asInstanceOf[CarbonRelation] + .lookupRelation(tblIdentifier)(sparkSession).asInstanceOf[CarbonRelation] val mapper = new ObjectMapper() val colProps = StringBuilder.newBuilder val dims = relation.metaData.dims.map(x => x.toLowerCase) @@ -922,8 +963,8 @@ private[sql] case class DescribeCommandFormatted( relation.tableMeta.carbonTableIdentifier.getTableName, fieldName) if (null != dimension.getColumnProperties && !dimension.getColumnProperties.isEmpty) { colProps.append(fieldName).append(".") - .append(mapper.writeValueAsString(dimension.getColumnProperties)) - .append(",") + .append(mapper.writeValueAsString(dimension.getColumnProperties)) + .append(",") } if (dimension.hasEncoding(Encoding.DICTIONARY) && !dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) { @@ -950,7 +991,7 @@ private[sql] case class DescribeCommandFormatted( } results ++= Seq(("", "", ""), ("##Detailed Table Information", "", "")) results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier - .getDatabaseName, "") + .getDatabaseName, "") ) results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, "")) results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, "")) @@ -964,34 +1005,17 @@ private[sql] case class DescribeCommandFormatted( } results ++= Seq(("SORT_COLUMNS", relation.metaData.carbonTable.getSortColumns( relation.tableMeta.carbonTableIdentifier.getTableName).asScala - .map(column => column).mkString(","), "")) + .map(column => column).mkString(","), "")) val dimension = carbonTable - .getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName) + .getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName) results ++= getColumnGroups(dimension.asScala.toList) if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) { results ++= - Seq(("Partition Columns: ", carbonTable.getPartitionInfo(carbonTable.getFactTableName) - .getColumnSchemaList.asScala.map(_.getColumnName).mkString(","), "")) + Seq(("Partition Columns: ", carbonTable.getPartitionInfo(carbonTable.getFactTableName) + .getColumnSchemaList.asScala.map(_.getColumnName).mkString(","), "")) } results.map { case (name, dataType, comment) => Row(f"$name%-36s", f"$dataType%-80s", f"$comment%-72s") } } - - private def getColumnGroups(dimensions: List[CarbonDimension]): Seq[(String, String, String)] = { - var results: Seq[(String, String, String)] = - Seq(("", "", ""), ("##Column Group Information", "", "")) - val groupedDimensions = dimensions.groupBy(x => x.columnGroupId()).filter { - case (groupId, _) => groupId != -1 - }.toSeq.sortBy(_._1) - val groups = groupedDimensions.map(colGroups => { - colGroups._2.map(dim => dim.getColName).mkString(", ") - }) - var index = 1 - groups.foreach { x => - results = results :+ (s"Column Group $index", x, "") - index = index + 1 - } - results - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c7f0b108/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 048681c..549841b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -430,8 +430,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca // while drop we should refresh the schema modified time so that if any thing has changed // in the other beeline need to update. checkSchemasModifiedTimeAndReloadTables - val file = FileFactory.getCarbonFile(metadataFilePath, fileType) - CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile) + val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, tableIdentifier.table) metadataToBeRemoved match { http://git-wip-us.apache.org/repos/asf/carbondata/blob/c7f0b108/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala index 03d0bde..a8f92ce 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala @@ -163,22 +163,11 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String) (sparkSession: SparkSession): Unit = { val dbName = tableIdentifier.database.get val tableName = tableIdentifier.table - - val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath, - new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) if (null != carbonTable) { // clear driver B-tree and dictionary cache ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable) } - val fileType = FileFactory.getFileType(metadataFilePath) - - if (FileFactory.isFileExist(metadataFilePath, fileType)) { - // while drop we should refresh the schema modified time so that if any thing has changed - // in the other beeline need to update. - val file = FileFactory.getCarbonFile(metadataFilePath, fileType) - CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile) - } CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) // discard cached table info in cachedDataSourceTables sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
