[
https://issues.apache.org/jira/browse/CARBONDATA-198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15454468#comment-15454468
]
ASF GitHub Bot commented on CARBONDATA-198:
-------------------------------------------
Github user ravikiran23 commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/116#discussion_r77119222
--- Diff:
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
---
@@ -344,49 +540,77 @@ object CarbonDataRDDFactory extends Logging {
if (loadsToMerge.size() > 1) {
- val sortedSegments: util.List[LoadMetadataDetails] = new
util.ArrayList[LoadMetadataDetails](
- segList
- )
- CarbonDataMergerUtil.sortSegments(sortedSegments)
- val lastSegment = sortedSegments.get(sortedSegments.size()-1)
-
- new Thread {
+ val compactionThread = new Thread {
override def run(): Unit = {
try {
- while (loadsToMerge.size() > 1) {
- deletePartialLoadsInCompaction(carbonLoadModel)
- val futureList: util.List[Future[Void]] = new
util.ArrayList[Future[Void]](
- CarbonCommonConstants
- .DEFAULT_COLLECTION_SIZE
- )
-
- scanSegmentsAndSubmitJob(futureList, loadsToMerge)
-
- futureList.asScala.foreach(future => {
- future.get
- }
- )
-
- // scan again and determine if anything is there to merge
again.
- readLoadMetadataDetails(carbonLoadModel, hdfsStoreLocation)
- segList = carbonLoadModel.getLoadMetadataDetails
- // in case of major compaction we will scan only once and
come out as it will keep
- // on doing major for the new loads also.
- // excluding the newly added segments.
- if (compactionModel.compactionType ==
CompactionType.MAJOR_COMPACTION) {
-
- segList = CarbonDataMergerUtil
- .filterOutNewlyAddedSegments(segList, loadsToMerge,
lastSegment)
+ executeCompaction(carbonLoadModel: CarbonLoadModel,
+ hdfsStoreLocation: String,
+ compactionModel: CompactionModel,
+ partitioner: Partitioner,
+ executor, sqlContext, kettleHomePath, storeLocation
+ )
+ // check for all the tables.
+ val isSystemCompactionLockEnabled =
CarbonProperties.getInstance()
+
.getProperty(CarbonCommonConstants.ENABLE_SYSTEM_LEVEL_COMPACTION_LOCK, "false")
+ .equalsIgnoreCase("true")
+
+ if (isSystemCompactionLockEnabled) {
+ logger.info("System level compaction lock is enabled.")
+
CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata.tablesMeta.map { c =>
--- End diff --
done
> Implementing system level lock for compaction.
> ----------------------------------------------
>
> Key: CARBONDATA-198
> URL: https://issues.apache.org/jira/browse/CARBONDATA-198
> Project: CarbonData
> Issue Type: Bug
> Components: spark-integration
> Reporter: ravikiran
>
> 1. making DDL of compaction as blocking call.
> 2. implemented System level compaction locking. only one compaction allowed ,
> other requests will create compaction request file.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)