Repository: incubator-carbondata Updated Branches: refs/heads/master 8802e9ebb -> 17f602dec
disabling the system compaction lock feature. and making the load ddl to wait for compaction to finish in the auto compaction case. Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/9702f7a8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/9702f7a8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/9702f7a8 Branch: refs/heads/master Commit: 9702f7a89179a610bd4c8c87e52994d94a946257 Parents: 8802e9e Author: ravikiran <ravikiran.sn...@gmail.com> Authored: Sat Sep 17 15:18:22 2016 +0530 Committer: Venkata Ramana G <ramana.gollam...@huawei.com> Committed: Sun Sep 18 03:09:57 2016 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 3 +- .../spark/rdd/CarbonDataRDDFactory.scala | 64 ++++++++++---------- .../CompactionSystemLockFeatureTest.scala | 2 + 3 files changed, 37 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9702f7a8/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 41d6ebf..1d60ee9 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -911,6 +911,7 @@ public final class CarbonCommonConstants { public static String majorCompactionRequiredFile = "compactionRequired_major"; /** + * @Deprecated : This property has been deprecated. * Property for enabling system level compaction lock.1 compaction can run at once. */ public static String ENABLE_CONCURRENT_COMPACTION = @@ -920,7 +921,7 @@ public final class CarbonCommonConstants { * Default value of Property for enabling system level compaction lock.1 compaction can run * at once. */ - public static String DEFAULT_ENABLE_CONCURRENT_COMPACTION = "false"; + public static String DEFAULT_ENABLE_CONCURRENT_COMPACTION = "true"; /** * Compaction system level lock folder. http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9702f7a8/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 5a1d14f..c50720a 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -652,14 +652,9 @@ object CarbonDataRDDFactory extends Logging { } } } - if(compactionModel.isDDLTrigger) { - // making this an blocking call for DDL - compactionThread.run() - } - else { - // non blocking call in case of auto compaction. - compactionThread.start() - } + // calling the run method of a thread to make the call as blocking call. + // in the future we may make this as concurrent. + compactionThread.run() } def prepareCarbonLoadModel(hdfsStoreLocation: String, @@ -775,6 +770,7 @@ object CarbonDataRDDFactory extends Logging { case e : Exception => logger.error("Exception in start compaction thread. " + e.getMessage) lock.unlock() + throw e } } else { @@ -846,9 +842,6 @@ object CarbonDataRDDFactory extends Logging { val schemaLastUpdatedTime = CarbonEnv.getInstance(sqlContext).carbonCatalog .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName) - // compaction handling - handleSegmentMerging(tableCreationTime) - // get partition way from configuration // val isTableSplitPartition = CarbonProperties.getInstance().getProperty( // CarbonCommonConstants.TABLE_SPLIT_PARTITION, @@ -1059,28 +1052,37 @@ object CarbonDataRDDFactory extends Logging { logWarning("Cannot write load metadata file as data load failed") throw new Exception(errorMessage) } else { - val metadataDetails = status(0)._2 - if (!isAgg) { - val status = CarbonLoaderUtil - .recordLoadMetadata(currentLoadCount, - metadataDetails, - carbonLoadModel, - loadStatus, - loadStartTime - ) - if (!status) { - val errorMessage = "Dataload failed due to failure in table status updation." - logger.audit("Data load is failed for " + - s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}") - logger.error("Dataload failed due to failure in table status updation.") - throw new Exception(errorMessage) + val metadataDetails = status(0)._2 + if (!isAgg) { + val status = CarbonLoaderUtil + .recordLoadMetadata(currentLoadCount, + metadataDetails, + carbonLoadModel, + loadStatus, + loadStartTime + ) + if (!status) { + val errorMessage = "Dataload failed due to failure in table status updation." + logger.audit("Data load is failed for " + + s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}") + logger.error("Dataload failed due to failure in table status updation.") + throw new Exception(errorMessage) + } + } else if (!carbonLoadModel.isRetentionRequest) { + // TODO : Handle it + logInfo("********Database updated**********") } - } else if (!carbonLoadModel.isRetentionRequest) { - // TODO : Handle it - logInfo("********Database updated**********") + logger.audit("Data load is successful for " + + s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}") + try { + // compaction handling + handleSegmentMerging(tableCreationTime) + } + catch { + case e: Exception => + throw new Exception( + "Dataload is success. Auto-Compaction has failed. Please check logs.") } - logger.audit("Data load is successful for " + - s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}") } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9702f7a8/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala index d9e1349..a040550 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala @@ -43,6 +43,8 @@ class CompactionSystemLockFeatureTest extends QueryTest with BeforeAndAfterAll { sql("drop table if exists table2") CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, "false") sql( "CREATE TABLE IF NOT EXISTS table1 (country String, ID Int, date Timestamp, name " + "String, " +