Repository: incubator-carbondata
Updated Branches:
  refs/heads/master de56d0e40 -> 0d9970f66


handling the exception cases in compaction.
if compaction is failed for table requested by user then need to show error 
message in the beeline.
and if any exception occurs then need to continue compaction for the queued 
compactions.

changing error message.

if the deletion of the compaction required file is not success then adding that 
table in the skip list so that it wont be considered again for compaction. this 
is to avoid the infinite loop.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/13b5d549
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/13b5d549
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/13b5d549

Branch: refs/heads/master
Commit: 13b5d549864f321fb8d47f2706aa120b94e409c7
Parents: de56d0e
Author: ravikiran <ravikiran.sn...@gmail.com>
Authored: Wed Sep 14 18:09:54 2016 +0530
Committer: Venkata Ramana G <ramana.gollam...@huawei.com>
Committed: Sun Sep 18 02:11:46 2016 +0530

----------------------------------------------------------------------
 .../spark/merger/CarbonCompactionUtil.java      | 10 ++-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 75 ++++++++++++++------
 .../execution/command/carbonTableSchema.scala   | 30 +++++---
 3 files changed, 80 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13b5d549/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java
 
b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java
index b3198b4..ed669a6 100644
--- 
a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java
+++ 
b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionUtil.java
@@ -26,6 +26,7 @@ import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.datastore.block.TaskBlockInfo;
 import 
org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
@@ -262,14 +263,19 @@ public class CarbonCompactionUtil {
 
   /**
    * This will check if any compaction request has been received for any table.
+   *
    * @param tableMetas
    * @return
    */
-  public static TableMeta getNextTableToCompact(TableMeta[] tableMetas) {
+  public static TableMeta getNextTableToCompact(TableMeta[] tableMetas,
+      List<CarbonTableIdentifier> skipList) {
     for (TableMeta table : tableMetas) {
       CarbonTable ctable = table.carbonTable();
       String metadataPath = ctable.getMetaDataFilepath();
-      if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath)) {
+      // check for the compaction required file and at the same time exclude 
the tables which are
+      // present in the skip list.
+      if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && 
!skipList
+          .contains(table.carbonTableIdentifier())) {
         return table;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13b5d549/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 31cc8ac..1144c59 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -32,11 +32,11 @@ import 
org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
 import org.apache.spark.{util => _, _}
 import org.apache.spark.sql.{CarbonEnv, SQLContext}
 import org.apache.spark.sql.execution.command.{AlterTableModel, 
CompactionCallableModel, CompactionModel, Partitioner}
-import org.apache.spark.sql.hive.DistributionUtil
+import org.apache.spark.sql.hive.{DistributionUtil, TableMeta}
 import org.apache.spark.util.{FileUtils, SplitUtils}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
+import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, 
CarbonTableIdentifier}
 import org.apache.carbondata.core.carbon.datastore.block.{Distributable, 
TableBlockInfo}
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
@@ -363,6 +363,10 @@ object CarbonDataRDDFactory extends Logging {
         case e : Exception =>
           logger.error("Exception in start compaction thread. " + e.getMessage)
           lock.unlock()
+          // if the compaction is a blocking call then only need to throw the 
exception.
+          if (compactionModel.isDDLTrigger) {
+            throw e
+          }
       }
     }
     else {
@@ -537,13 +541,24 @@ object CarbonDataRDDFactory extends Logging {
         override def run(): Unit = {
 
           try {
-            executeCompaction(carbonLoadModel: CarbonLoadModel,
-              hdfsStoreLocation: String,
-              compactionModel: CompactionModel,
-              partitioner: Partitioner,
-              executor, sqlContext, kettleHomePath, storeLocation
-            )
-            // check for all the tables.
+            // compaction status of the table which is triggered by the user.
+            var triggeredCompactionStatus = false
+            var exception : Exception = null
+            try {
+              executeCompaction(carbonLoadModel: CarbonLoadModel,
+                hdfsStoreLocation: String,
+                compactionModel: CompactionModel,
+                partitioner: Partitioner,
+                executor, sqlContext, kettleHomePath, storeLocation
+              )
+              triggeredCompactionStatus = true
+            }
+            catch {
+              case e: Exception =>
+                logger.error("Exception in compaction thread " + e.getMessage)
+                exception = e
+            }
+            // continue in case of exception also, check for all the tables.
             val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
               .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
                 CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
@@ -551,15 +566,16 @@ object CarbonDataRDDFactory extends Logging {
 
             if (!isConcurrentCompactionAllowed) {
               logger.info("System level compaction lock is enabled.")
+              val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
               var tableForCompaction = CarbonCompactionUtil
                 
.getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
-                  .tablesMeta.toArray
+                  .tablesMeta.toArray, skipCompactionTables.toList.asJava
                 )
-              while(null != tableForCompaction) {
+              while (null != tableForCompaction) {
                 logger
                   .info("Compaction request has been identified for table " + 
tableForCompaction
                     .carbonTable.getDatabaseName + "." + 
tableForCompaction.carbonTableIdentifier
-                    .getTableName
+                          .getTableName
                   )
                 val table: CarbonTable = tableForCompaction.carbonTable
                 val metadataPath = table.getMetaDataFilepath
@@ -590,32 +606,45 @@ object CarbonDataRDDFactory extends Logging {
                     executor, sqlContext, kettleHomePath, storeLocation
                   )
                 }
+                catch {
+                  case e: Exception =>
+                    logger.error("Exception in compaction thread for table " + 
tableForCompaction
+                      .carbonTable.getDatabaseName + "." +
+                                 tableForCompaction.carbonTableIdentifier
+                                   .getTableName)
+                  // not handling the exception. only logging as this is not 
the table triggered
+                  // by user.
+                }
                 finally {
-                  // delete the compaction required file
+                  // delete the compaction required file in case of failure or 
success also.
                   if (!CarbonCompactionUtil
                     .deleteCompactionRequiredFile(metadataPath, 
compactionType)) {
+                    // if the compaction request file is not been able to 
delete then
+                    // add those tables details to the skip list so that it 
wont be considered next.
+                    
skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
                     logger
                       .error("Compaction request file can not be deleted for 
table " +
-                        tableForCompaction
-                        .carbonTable.getDatabaseName + "." + tableForCompaction
-                        .carbonTableIdentifier
-                        .getTableName
+                             tableForCompaction
+                               .carbonTable.getDatabaseName + "." + 
tableForCompaction
+                               .carbonTableIdentifier
+                               .getTableName
                       )
+
                   }
                 }
                 // ********* check again for all the tables.
                 tableForCompaction = CarbonCompactionUtil
                   
.getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
-                    .tablesMeta.toArray
+                    .tablesMeta.toArray, skipCompactionTables.asJava
                   )
               }
+              // giving the user his error for telling in the beeline if his 
triggered table
+              // compaction is failed.
+              if (!triggeredCompactionStatus) {
+                throw new Exception("Exception in compaction " + 
exception.getMessage)
+              }
             }
           }
-          catch {
-            case e: Exception =>
-              logger.error("Exception in compaction thread " + e.getMessage)
-              throw e
-          }
           finally {
             executor.shutdownNow()
             deletePartialLoadsInCompaction(carbonLoadModel)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/13b5d549/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index b8c56d3..19519ea 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -794,16 +794,26 @@ private[sql] case class 
AlterTableCompaction(alterTableModel: AlterTableModel) e
         System.getProperty("java.io.tmpdir")
       )
     storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
-
-    CarbonDataRDDFactory
-      .alterTableForCompaction(sqlContext,
-        alterTableModel,
-        carbonLoadModel,
-        partitioner,
-        relation.tableMeta.storePath,
-        kettleHomePath,
-        storeLocation
-      )
+    try {
+      CarbonDataRDDFactory
+        .alterTableForCompaction(sqlContext,
+          alterTableModel,
+          carbonLoadModel,
+          partitioner,
+          relation.tableMeta.storePath,
+          kettleHomePath,
+          storeLocation
+        )
+    }
+    catch {
+      case e: Exception =>
+        if (null != e.getMessage) {
+          sys.error("Compaction failed. Please check logs for more info." + 
e.getMessage)
+        }
+        else {
+          sys.error("Exception in compaction. Please check logs for more 
info.")
+        }
+    }
 
     Seq.empty
   }

Reply via email to