Github user jackylk commented on a diff in the pull request: https://github.com/apache/incubator-carbondata/pull/248#discussion_r84401743 --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala --- @@ -505,129 +512,129 @@ object CarbonDataRDDFactory extends Logging { ) } - val compactionThread = new Thread { - override def run(): Unit = { + val compactionThread = new Thread { + override def run(): Unit = { + try { + // compaction status of the table which is triggered by the user. + var triggeredCompactionStatus = false + var exception: Exception = null try { - // compaction status of the table which is triggered by the user. - var triggeredCompactionStatus = false - var exception : Exception = null - try { - executeCompaction(carbonLoadModel: CarbonLoadModel, - hdfsStoreLocation: String, - compactionModel: CompactionModel, - partitioner: Partitioner, - executor, sqlContext, kettleHomePath, storeLocation + executeCompaction(carbonLoadModel: CarbonLoadModel, + hdfsStoreLocation: String, + compactionModel: CompactionModel, + partitioner: Partitioner, + executor, sqlContext, kettleHomePath, storeLocation + ) + triggeredCompactionStatus = true + } + catch { + case e: Exception => + logger.error("Exception in compaction thread " + e.getMessage) + exception = e + } + // continue in case of exception also, check for all the tables. + val isConcurrentCompactionAllowed = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, + CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION + ).equalsIgnoreCase("true") + + if (!isConcurrentCompactionAllowed) { + logger.info("System level compaction lock is enabled.") + val skipCompactionTables = ListBuffer[CarbonTableIdentifier]() + var tableForCompaction = CarbonCompactionUtil + .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata + .tablesMeta.toArray, skipCompactionTables.toList.asJava ) - triggeredCompactionStatus = true - } - catch { - case e: Exception => - logger.error("Exception in compaction thread " + e.getMessage) - exception = e - } - // continue in case of exception also, check for all the tables. - val isConcurrentCompactionAllowed = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, - CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION - ).equalsIgnoreCase("true") - - if (!isConcurrentCompactionAllowed) { - logger.info("System level compaction lock is enabled.") - val skipCompactionTables = ListBuffer[CarbonTableIdentifier]() - var tableForCompaction = CarbonCompactionUtil - .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata - .tablesMeta.toArray, skipCompactionTables.toList.asJava + while (null != tableForCompaction) { + logger + .info("Compaction request has been identified for table " + tableForCompaction + .carbonTable.getDatabaseName + "." + tableForCompaction.carbonTableIdentifier + .getTableName ) - while (null != tableForCompaction) { - logger - .info("Compaction request has been identified for table " + tableForCompaction - .carbonTable.getDatabaseName + "." + tableForCompaction.carbonTableIdentifier - .getTableName - ) - val table: CarbonTable = tableForCompaction.carbonTable - val metadataPath = table.getMetaDataFilepath - val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath) - - val newCarbonLoadModel = new CarbonLoadModel() - prepareCarbonLoadModel(hdfsStoreLocation, table, newCarbonLoadModel) - val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog - .getTableCreationTime(newCarbonLoadModel.getDatabaseName, - newCarbonLoadModel.getTableName - ) - - val compactionSize = CarbonDataMergerUtil - .getCompactionSize(CompactionType.MAJOR_COMPACTION) - - val newcompactionModel = CompactionModel(compactionSize, - compactionType, - table, - tableCreationTime, - compactionModel.isDDLTrigger + val table: CarbonTable = tableForCompaction.carbonTable + val metadataPath = table.getMetaDataFilepath + val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath) + + val newCarbonLoadModel = new CarbonLoadModel() + prepareCarbonLoadModel(hdfsStoreLocation, table, newCarbonLoadModel) + val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog + .getTableCreationTime(newCarbonLoadModel.getDatabaseName, + newCarbonLoadModel.getTableName + ) + + val compactionSize = CarbonDataMergerUtil + .getCompactionSize(CompactionType.MAJOR_COMPACTION) + + val newcompactionModel = CompactionModel(compactionSize, + compactionType, + table, + tableCreationTime, + compactionModel.isDDLTrigger + ) + // proceed for compaction + try { + executeCompaction(newCarbonLoadModel, + newCarbonLoadModel.getStorePath, + newcompactionModel, + partitioner, + executor, sqlContext, kettleHomePath, storeLocation ) - // proceed for compaction - try { - executeCompaction(newCarbonLoadModel, - newCarbonLoadModel.getStorePath, - newcompactionModel, - partitioner, - executor, sqlContext, kettleHomePath, storeLocation - ) - } - catch { - case e: Exception => - logger.error("Exception in compaction thread for table " + tableForCompaction - .carbonTable.getDatabaseName + "." + - tableForCompaction.carbonTableIdentifier - .getTableName) - // not handling the exception. only logging as this is not the table triggered - // by user. - } - finally { - // delete the compaction required file in case of failure or success also. - if (!CarbonCompactionUtil - .deleteCompactionRequiredFile(metadataPath, compactionType)) { - // if the compaction request file is not been able to delete then - // add those tables details to the skip list so that it wont be considered next. - skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier) - logger - .error("Compaction request file can not be deleted for table " + - tableForCompaction - .carbonTable.getDatabaseName + "." + tableForCompaction - .carbonTableIdentifier - .getTableName - ) - - } - } - // ********* check again for all the tables. - tableForCompaction = CarbonCompactionUtil - .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata - .tablesMeta.toArray, skipCompactionTables.asJava - ) } - // giving the user his error for telling in the beeline if his triggered table - // compaction is failed. - if (!triggeredCompactionStatus) { - throw new Exception("Exception in compaction " + exception.getMessage) + catch { + case e: Exception => + logger.error("Exception in compaction thread for table " + tableForCompaction + .carbonTable.getDatabaseName + "." + + tableForCompaction.carbonTableIdentifier + .getTableName) --- End diff -- use `s" "` instead of string concat
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---