disabling the system compaction lock feature. and making the load ddl to wait 
for compaction to finish in the auto compaction case.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/fe274a9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/fe274a9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/fe274a9f

Branch: refs/heads/branch-0.1
Commit: fe274a9f58e353f925374cf5ec51484ef4b23ffe
Parents: b55e7d3
Author: ravikiran <ravikiran.sn...@gmail.com>
Authored: Sat Sep 17 15:18:22 2016 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Thu Sep 22 10:52:31 2016 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  3 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 64 ++++++++++----------
 .../CompactionSystemLockFeatureTest.scala       |  2 +
 3 files changed, 37 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe274a9f/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 41d6ebf..1d60ee9 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -911,6 +911,7 @@ public final class CarbonCommonConstants {
   public static String majorCompactionRequiredFile = 
"compactionRequired_major";
 
   /**
+   * @Deprecated : This property has been deprecated.
    * Property for enabling system level compaction lock.1 compaction can run 
at once.
    */
   public static String ENABLE_CONCURRENT_COMPACTION =
@@ -920,7 +921,7 @@ public final class CarbonCommonConstants {
    * Default value of Property for enabling system level compaction lock.1 
compaction can run
    * at once.
    */
-  public static String DEFAULT_ENABLE_CONCURRENT_COMPACTION = "false";
+  public static String DEFAULT_ENABLE_CONCURRENT_COMPACTION = "true";
 
   /**
    * Compaction system level lock folder.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe274a9f/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 964c955..3e0388f 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -592,14 +592,9 @@ object CarbonDataRDDFactory extends Logging {
           }
         }
       }
-      if(compactionModel.isDDLTrigger) {
-        // making this an blocking call for DDL
-        compactionThread.run()
-      }
-      else {
-        // non blocking call in case of auto compaction.
-        compactionThread.start()
-      }
+    // calling the run method of a thread to make the call as blocking call.
+    // in the future we may make this as concurrent.
+    compactionThread.run()
   }
 
   def prepareCarbonLoadModel(hdfsStoreLocation: String,
@@ -715,6 +710,7 @@ object CarbonDataRDDFactory extends Logging {
               case e : Exception =>
                 logger.error("Exception in start compaction thread. " + 
e.getMessage)
                 lock.unlock()
+                throw e
             }
           }
           else {
@@ -786,9 +782,6 @@ object CarbonDataRDDFactory extends Logging {
       val schemaLastUpdatedTime = 
CarbonEnv.getInstance(sqlContext).carbonCatalog
         .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, 
carbonLoadModel.getTableName)
 
-      // compaction handling
-      handleSegmentMerging(tableCreationTime)
-
       // get partition way from configuration
       // val isTableSplitPartition = 
CarbonProperties.getInstance().getProperty(
       // CarbonCommonConstants.TABLE_SPLIT_PARTITION,
@@ -999,28 +992,37 @@ object CarbonDataRDDFactory extends Logging {
         logWarning("Cannot write load metadata file as data load failed")
         throw new Exception(errorMessage)
       } else {
-        val metadataDetails = status(0)._2
-        if (!isAgg) {
-          val status = CarbonLoaderUtil
-            .recordLoadMetadata(currentLoadCount,
-              metadataDetails,
-              carbonLoadModel,
-              loadStatus,
-              loadStartTime
-            )
-          if (!status) {
-            val errorMessage = "Dataload failed due to failure in table status 
updation."
-            logger.audit("Data load is failed for " +
-              
s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
-            logger.error("Dataload failed due to failure in table status 
updation.")
-            throw new Exception(errorMessage)
+          val metadataDetails = status(0)._2
+          if (!isAgg) {
+            val status = CarbonLoaderUtil
+              .recordLoadMetadata(currentLoadCount,
+                metadataDetails,
+                carbonLoadModel,
+                loadStatus,
+                loadStartTime
+              )
+            if (!status) {
+              val errorMessage = "Dataload failed due to failure in table 
status updation."
+              logger.audit("Data load is failed for " +
+                           
s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
+              logger.error("Dataload failed due to failure in table status 
updation.")
+              throw new Exception(errorMessage)
+            }
+          } else if (!carbonLoadModel.isRetentionRequest) {
+            // TODO : Handle it
+            logInfo("********Database updated**********")
           }
-        } else if (!carbonLoadModel.isRetentionRequest) {
-          // TODO : Handle it
-          logInfo("********Database updated**********")
+          logger.audit("Data load is successful for " +
+                       
s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
+        try {
+          // compaction handling
+          handleSegmentMerging(tableCreationTime)
+        }
+        catch {
+          case e: Exception =>
+            throw new Exception(
+              "Dataload is success. Auto-Compaction has failed. Please check 
logs.")
         }
-        logger.audit("Data load is successful for " +
-          
s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fe274a9f/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
index d9e1349..a040550 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
@@ -43,6 +43,8 @@ class CompactionSystemLockFeatureTest extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists  table2")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, "false")
     sql(
       "CREATE TABLE IF NOT EXISTS table1 (country String, ID Int, date 
Timestamp, name " +
         "String, " +

Reply via email to