Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/248#discussion_r84401743
--- Diff:
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
---
@@ -505,129 +512,129 @@ object CarbonDataRDDFactory extends Logging {
)
}
- val compactionThread = new Thread {
- override def run(): Unit = {
+ val compactionThread = new Thread {
+ override def run(): Unit = {
+ try {
+ // compaction status of the table which is triggered by the user.
+ var triggeredCompactionStatus = false
+ var exception: Exception = null
try {
- // compaction status of the table which is triggered by the
user.
- var triggeredCompactionStatus = false
- var exception : Exception = null
- try {
- executeCompaction(carbonLoadModel: CarbonLoadModel,
- hdfsStoreLocation: String,
- compactionModel: CompactionModel,
- partitioner: Partitioner,
- executor, sqlContext, kettleHomePath, storeLocation
+ executeCompaction(carbonLoadModel: CarbonLoadModel,
+ hdfsStoreLocation: String,
+ compactionModel: CompactionModel,
+ partitioner: Partitioner,
+ executor, sqlContext, kettleHomePath, storeLocation
+ )
+ triggeredCompactionStatus = true
+ }
+ catch {
+ case e: Exception =>
+ logger.error("Exception in compaction thread " +
e.getMessage)
+ exception = e
+ }
+ // continue in case of exception also, check for all the tables.
+ val isConcurrentCompactionAllowed =
CarbonProperties.getInstance()
+
.getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+ CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+ ).equalsIgnoreCase("true")
+
+ if (!isConcurrentCompactionAllowed) {
+ logger.info("System level compaction lock is enabled.")
+ val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
+ var tableForCompaction = CarbonCompactionUtil
+
.getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
+ .tablesMeta.toArray, skipCompactionTables.toList.asJava
)
- triggeredCompactionStatus = true
- }
- catch {
- case e: Exception =>
- logger.error("Exception in compaction thread " +
e.getMessage)
- exception = e
- }
- // continue in case of exception also, check for all the
tables.
- val isConcurrentCompactionAllowed =
CarbonProperties.getInstance()
-
.getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
- CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
- ).equalsIgnoreCase("true")
-
- if (!isConcurrentCompactionAllowed) {
- logger.info("System level compaction lock is enabled.")
- val skipCompactionTables =
ListBuffer[CarbonTableIdentifier]()
- var tableForCompaction = CarbonCompactionUtil
-
.getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
- .tablesMeta.toArray, skipCompactionTables.toList.asJava
+ while (null != tableForCompaction) {
+ logger
+ .info("Compaction request has been identified for table "
+ tableForCompaction
+ .carbonTable.getDatabaseName + "." +
tableForCompaction.carbonTableIdentifier
+ .getTableName
)
- while (null != tableForCompaction) {
- logger
- .info("Compaction request has been identified for table
" + tableForCompaction
- .carbonTable.getDatabaseName + "." +
tableForCompaction.carbonTableIdentifier
- .getTableName
- )
- val table: CarbonTable = tableForCompaction.carbonTable
- val metadataPath = table.getMetaDataFilepath
- val compactionType =
CarbonCompactionUtil.determineCompactionType(metadataPath)
-
- val newCarbonLoadModel = new CarbonLoadModel()
- prepareCarbonLoadModel(hdfsStoreLocation, table,
newCarbonLoadModel)
- val tableCreationTime =
CarbonEnv.getInstance(sqlContext).carbonCatalog
- .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
- newCarbonLoadModel.getTableName
- )
-
- val compactionSize = CarbonDataMergerUtil
- .getCompactionSize(CompactionType.MAJOR_COMPACTION)
-
- val newcompactionModel = CompactionModel(compactionSize,
- compactionType,
- table,
- tableCreationTime,
- compactionModel.isDDLTrigger
+ val table: CarbonTable = tableForCompaction.carbonTable
+ val metadataPath = table.getMetaDataFilepath
+ val compactionType =
CarbonCompactionUtil.determineCompactionType(metadataPath)
+
+ val newCarbonLoadModel = new CarbonLoadModel()
+ prepareCarbonLoadModel(hdfsStoreLocation, table,
newCarbonLoadModel)
+ val tableCreationTime =
CarbonEnv.getInstance(sqlContext).carbonCatalog
+ .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
+ newCarbonLoadModel.getTableName
+ )
+
+ val compactionSize = CarbonDataMergerUtil
+ .getCompactionSize(CompactionType.MAJOR_COMPACTION)
+
+ val newcompactionModel = CompactionModel(compactionSize,
+ compactionType,
+ table,
+ tableCreationTime,
+ compactionModel.isDDLTrigger
+ )
+ // proceed for compaction
+ try {
+ executeCompaction(newCarbonLoadModel,
+ newCarbonLoadModel.getStorePath,
+ newcompactionModel,
+ partitioner,
+ executor, sqlContext, kettleHomePath, storeLocation
)
- // proceed for compaction
- try {
- executeCompaction(newCarbonLoadModel,
- newCarbonLoadModel.getStorePath,
- newcompactionModel,
- partitioner,
- executor, sqlContext, kettleHomePath, storeLocation
- )
- }
- catch {
- case e: Exception =>
- logger.error("Exception in compaction thread for table
" + tableForCompaction
- .carbonTable.getDatabaseName + "." +
- tableForCompaction.carbonTableIdentifier
- .getTableName)
- // not handling the exception. only logging as this is
not the table triggered
- // by user.
- }
- finally {
- // delete the compaction required file in case of
failure or success also.
- if (!CarbonCompactionUtil
- .deleteCompactionRequiredFile(metadataPath,
compactionType)) {
- // if the compaction request file is not been able to
delete then
- // add those tables details to the skip list so that
it wont be considered next.
-
skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
- logger
- .error("Compaction request file can not be deleted
for table " +
- tableForCompaction
- .carbonTable.getDatabaseName + "." +
tableForCompaction
- .carbonTableIdentifier
- .getTableName
- )
-
- }
- }
- // ********* check again for all the tables.
- tableForCompaction = CarbonCompactionUtil
-
.getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
- .tablesMeta.toArray, skipCompactionTables.asJava
- )
}
- // giving the user his error for telling in the beeline if
his triggered table
- // compaction is failed.
- if (!triggeredCompactionStatus) {
- throw new Exception("Exception in compaction " +
exception.getMessage)
+ catch {
+ case e: Exception =>
+ logger.error("Exception in compaction thread for table "
+ tableForCompaction
+ .carbonTable.getDatabaseName + "." +
+ tableForCompaction.carbonTableIdentifier
+ .getTableName)
--- End diff --
use `s" "` instead of string concat
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---