Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84401743
  
    --- Diff: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 ---
    @@ -505,129 +512,129 @@ object CarbonDataRDDFactory extends Logging {
               )
         }
     
    -      val compactionThread = new Thread {
    -        override def run(): Unit = {
    +    val compactionThread = new Thread {
    +      override def run(): Unit = {
     
    +        try {
    +          // compaction status of the table which is triggered by the user.
    +          var triggeredCompactionStatus = false
    +          var exception: Exception = null
               try {
    -            // compaction status of the table which is triggered by the 
user.
    -            var triggeredCompactionStatus = false
    -            var exception : Exception = null
    -            try {
    -              executeCompaction(carbonLoadModel: CarbonLoadModel,
    -                hdfsStoreLocation: String,
    -                compactionModel: CompactionModel,
    -                partitioner: Partitioner,
    -                executor, sqlContext, kettleHomePath, storeLocation
    +            executeCompaction(carbonLoadModel: CarbonLoadModel,
    +              hdfsStoreLocation: String,
    +              compactionModel: CompactionModel,
    +              partitioner: Partitioner,
    +              executor, sqlContext, kettleHomePath, storeLocation
    +            )
    +            triggeredCompactionStatus = true
    +          }
    +          catch {
    +            case e: Exception =>
    +              logger.error("Exception in compaction thread " + 
e.getMessage)
    +              exception = e
    +          }
    +          // continue in case of exception also, check for all the tables.
    +          val isConcurrentCompactionAllowed = 
CarbonProperties.getInstance()
    +            
.getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
    +              CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
    +            ).equalsIgnoreCase("true")
    +
    +          if (!isConcurrentCompactionAllowed) {
    +            logger.info("System level compaction lock is enabled.")
    +            val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
    +            var tableForCompaction = CarbonCompactionUtil
    +              
.getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
    +                .tablesMeta.toArray, skipCompactionTables.toList.asJava
                   )
    -              triggeredCompactionStatus = true
    -            }
    -            catch {
    -              case e: Exception =>
    -                logger.error("Exception in compaction thread " + 
e.getMessage)
    -                exception = e
    -            }
    -            // continue in case of exception also, check for all the 
tables.
    -            val isConcurrentCompactionAllowed = 
CarbonProperties.getInstance()
    -              
.getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
    -                CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
    -              ).equalsIgnoreCase("true")
    -
    -            if (!isConcurrentCompactionAllowed) {
    -              logger.info("System level compaction lock is enabled.")
    -              val skipCompactionTables = 
ListBuffer[CarbonTableIdentifier]()
    -              var tableForCompaction = CarbonCompactionUtil
    -                
.getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
    -                  .tablesMeta.toArray, skipCompactionTables.toList.asJava
    +            while (null != tableForCompaction) {
    +              logger
    +                .info("Compaction request has been identified for table " 
+ tableForCompaction
    +                  .carbonTable.getDatabaseName + "." + 
tableForCompaction.carbonTableIdentifier
    +                        .getTableName
                     )
    -              while (null != tableForCompaction) {
    -                logger
    -                  .info("Compaction request has been identified for table 
" + tableForCompaction
    -                    .carbonTable.getDatabaseName + "." + 
tableForCompaction.carbonTableIdentifier
    -                          .getTableName
    -                  )
    -                val table: CarbonTable = tableForCompaction.carbonTable
    -                val metadataPath = table.getMetaDataFilepath
    -                val compactionType = 
CarbonCompactionUtil.determineCompactionType(metadataPath)
    -
    -                val newCarbonLoadModel = new CarbonLoadModel()
    -                prepareCarbonLoadModel(hdfsStoreLocation, table, 
newCarbonLoadModel)
    -                val tableCreationTime = 
CarbonEnv.getInstance(sqlContext).carbonCatalog
    -                  .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
    -                    newCarbonLoadModel.getTableName
    -                  )
    -
    -                val compactionSize = CarbonDataMergerUtil
    -                  .getCompactionSize(CompactionType.MAJOR_COMPACTION)
    -
    -                val newcompactionModel = CompactionModel(compactionSize,
    -                  compactionType,
    -                  table,
    -                  tableCreationTime,
    -                  compactionModel.isDDLTrigger
    +              val table: CarbonTable = tableForCompaction.carbonTable
    +              val metadataPath = table.getMetaDataFilepath
    +              val compactionType = 
CarbonCompactionUtil.determineCompactionType(metadataPath)
    +
    +              val newCarbonLoadModel = new CarbonLoadModel()
    +              prepareCarbonLoadModel(hdfsStoreLocation, table, 
newCarbonLoadModel)
    +              val tableCreationTime = 
CarbonEnv.getInstance(sqlContext).carbonCatalog
    +                .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
    +                  newCarbonLoadModel.getTableName
    +                )
    +
    +              val compactionSize = CarbonDataMergerUtil
    +                .getCompactionSize(CompactionType.MAJOR_COMPACTION)
    +
    +              val newcompactionModel = CompactionModel(compactionSize,
    +                compactionType,
    +                table,
    +                tableCreationTime,
    +                compactionModel.isDDLTrigger
    +              )
    +              // proceed for compaction
    +              try {
    +                executeCompaction(newCarbonLoadModel,
    +                  newCarbonLoadModel.getStorePath,
    +                  newcompactionModel,
    +                  partitioner,
    +                  executor, sqlContext, kettleHomePath, storeLocation
                     )
    -                // proceed for compaction
    -                try {
    -                  executeCompaction(newCarbonLoadModel,
    -                    newCarbonLoadModel.getStorePath,
    -                    newcompactionModel,
    -                    partitioner,
    -                    executor, sqlContext, kettleHomePath, storeLocation
    -                  )
    -                }
    -                catch {
    -                  case e: Exception =>
    -                    logger.error("Exception in compaction thread for table 
" + tableForCompaction
    -                      .carbonTable.getDatabaseName + "." +
    -                                 tableForCompaction.carbonTableIdentifier
    -                                   .getTableName)
    -                  // not handling the exception. only logging as this is 
not the table triggered
    -                  // by user.
    -                }
    -                finally {
    -                  // delete the compaction required file in case of 
failure or success also.
    -                  if (!CarbonCompactionUtil
    -                    .deleteCompactionRequiredFile(metadataPath, 
compactionType)) {
    -                    // if the compaction request file is not been able to 
delete then
    -                    // add those tables details to the skip list so that 
it wont be considered next.
    -                    
skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
    -                    logger
    -                      .error("Compaction request file can not be deleted 
for table " +
    -                             tableForCompaction
    -                               .carbonTable.getDatabaseName + "." + 
tableForCompaction
    -                               .carbonTableIdentifier
    -                               .getTableName
    -                      )
    -
    -                  }
    -                }
    -                // ********* check again for all the tables.
    -                tableForCompaction = CarbonCompactionUtil
    -                  
.getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
    -                    .tablesMeta.toArray, skipCompactionTables.asJava
    -                  )
                   }
    -              // giving the user his error for telling in the beeline if 
his triggered table
    -              // compaction is failed.
    -              if (!triggeredCompactionStatus) {
    -                throw new Exception("Exception in compaction " + 
exception.getMessage)
    +              catch {
    +                case e: Exception =>
    +                  logger.error("Exception in compaction thread for table " 
+ tableForCompaction
    +                    .carbonTable.getDatabaseName + "." +
    +                               tableForCompaction.carbonTableIdentifier
    +                                 .getTableName)
    --- End diff --
    
    use `s" "` instead of string concat


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to