Repository: carbondata
Updated Branches:
  refs/heads/master 15ab6b066 -> b88c09707


[CARBONDATA-2470] Refactor AlterTableCompactionPostStatusUpdateEvent usage in 
compaction flow

AlterTableCompactionPostStatusUpdateEvent is a generic event fired after 
COMPACTION
but it is controlled only by the preaggregate listener. If the 
CommitPreAggregateListener
sets the commitComplete property to true, this event will not be fired for the 
next iteration.
For each and every compacted segments this must be fired.

This closes #2295


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b88c0970
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b88c0970
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b88c0970

Branch: refs/heads/master
Commit: b88c097075b2ef859d25db27da08435d66a814a2
Parents: 15ab6b0
Author: dhatchayani <dhatcha.offic...@gmail.com>
Authored: Thu May 10 19:07:53 2018 +0530
Committer: kunal642 <kunalkapoor...@gmail.com>
Committed: Fri May 18 11:28:37 2018 +0530

----------------------------------------------------------------------
 .../spark/rdd/CarbonTableCompactor.scala        | 26 ++----
 .../preaaggregate/PreAggregateListeners.scala   | 88 ++++++++++++--------
 2 files changed, 59 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b88c0970/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 199b7a3..155bdd1 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -270,26 +270,14 @@ class CarbonTableCompactor(carbonLoadModel: 
CarbonLoadModel,
         carbonMergerMapping,
         carbonLoadModel,
         mergedLoadName)
-      val commitComplete = try {
-        // Once main table compaction is done and 0.1, 4.1, 8.1 is created 
commit will happen for
-        // all the tables. The commit listener will compact the child tables 
until no more segments
-        // are left. But 2nd level compaction is yet to happen on the main 
table therefore again the
-        // compaction flow will try to commit the child tables which is wrong. 
This check tell the
-        // 2nd level compaction flow that the commit for datamaps is already 
done.
-        val isCommitDone = operationContext.getProperty("commitComplete")
-        if (isCommitDone != null) {
-          isCommitDone.toString.toBoolean
-        } else {
-          OperationListenerBus.getInstance()
-            .fireEvent(compactionLoadStatusPostEvent, operationContext)
-          true
-        }
-      } catch {
-        case ex: Exception =>
-          LOGGER.error(ex, "Problem while committing data maps")
-          false
+      OperationListenerBus.getInstance()
+        .fireEvent(compactionLoadStatusPostEvent, operationContext)
+      val commitDone = operationContext.getProperty("commitComplete")
+      val commitComplete = if (null != commitDone) {
+        commitDone.toString.toBoolean
+      } else {
+        true
       }
-      operationContext.setProperty("commitComplete", commitComplete)
       // here either of the conditions can be true, when delete segment is 
fired after compaction
       // has started, statusFileUpdation will be false , but at the same time 
commitComplete can be
       // true because compaction for all datamaps will be finished at a time 
to the maximum level

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b88c0970/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 5e11884..a41f78c 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -512,6 +512,14 @@ object CommitPreAggregateListener extends 
OperationEventListener with CommitHelp
       case loadEvent: LoadTablePostStatusUpdateEvent =>
         loadEvent.getCarbonLoadModel
       case compactionEvent: AlterTableCompactionPostStatusUpdateEvent =>
+        // Once main table compaction is done and 0.1, 4.1, 8.1 is created 
commit will happen for
+        // all the tables. The commit listener will compact the child tables 
until no more segments
+        // are left. But 2nd level compaction is yet to happen on the main 
table therefore again the
+        // compaction flow will try to commit the child tables which is wrong. 
This check tell the
+        // 2nd level compaction flow that the commit for datamaps is already 
done.
+        if (null != operationContext.getProperty("commitComplete")) {
+          return
+        }
         compactionEvent.carbonLoadModel
     }
     val isCompactionFlow = Option(
@@ -533,45 +541,53 @@ object CommitPreAggregateListener extends 
OperationEventListener with CommitHelp
             .asInstanceOf[CarbonLoadDataCommand]
         }
       }
-     if (dataMapSchemas.nonEmpty) {
-       val uuid = operationContext.getProperty("uuid").toString
-      // keep committing until one fails
-      val renamedDataMaps = childLoadCommands.takeWhile { childLoadCommand =>
-        val childCarbonTable = childLoadCommand.table
-        // Generate table status file name with UUID, forExample: tablestatus_1
-        val oldTableSchemaPath = 
CarbonTablePath.getTableStatusFilePathWithUUID(
-          childCarbonTable.getTablePath, uuid)
-        // Generate table status file name without UUID, forExample: 
tablestatus
-        val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
-          childCarbonTable.getTablePath)
-        renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, 
uuid)
-      }
-      // if true then the commit for one of the child tables has failed
-      val commitFailed = renamedDataMaps.lengthCompare(dataMapSchemas.length) 
!= 0
-      if (commitFailed) {
-        LOGGER.warn("Reverting table status file to original state")
-        renamedDataMaps.foreach {
-          loadCommand =>
-            val carbonTable = loadCommand.table
-            // rename the backup tablestatus i.e tablestatus_backup_UUID to 
tablestatus
-            val backupTableSchemaPath =
-              CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) 
+ "_backup_" + uuid
-            val tableSchemaPath = 
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
-            markInProgressSegmentAsDeleted(backupTableSchemaPath, 
operationContext, carbonTable)
-            renameDataMapTableStatusFiles(backupTableSchemaPath, 
tableSchemaPath, "")
+    var commitFailed = false
+    try {
+      if (dataMapSchemas.nonEmpty) {
+        val uuid = operationContext.getProperty("uuid").toString
+        // keep committing until one fails
+        val renamedDataMaps = childLoadCommands.takeWhile { childLoadCommand =>
+          val childCarbonTable = childLoadCommand.table
+          // Generate table status file name with UUID, forExample: 
tablestatus_1
+          val oldTableSchemaPath = 
CarbonTablePath.getTableStatusFilePathWithUUID(
+            childCarbonTable.getTablePath, uuid)
+          // Generate table status file name without UUID, forExample: 
tablestatus
+          val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
+            childCarbonTable.getTablePath)
+          renameDataMapTableStatusFiles(oldTableSchemaPath, 
newTableSchemaPath, uuid)
+        }
+        // if true then the commit for one of the child tables has failed
+        commitFailed = renamedDataMaps.lengthCompare(dataMapSchemas.length) != 0
+        if (commitFailed) {
+          LOGGER.warn("Reverting table status file to original state")
+          renamedDataMaps.foreach {
+            loadCommand =>
+              val carbonTable = loadCommand.table
+              // rename the backup tablestatus i.e tablestatus_backup_UUID to 
tablestatus
+              val backupTableSchemaPath =
+                
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" +
+                uuid
+              val tableSchemaPath = CarbonTablePath
+                .getTableStatusFilePath(carbonTable.getTablePath)
+              markInProgressSegmentAsDeleted(backupTableSchemaPath, 
operationContext, carbonTable)
+              renameDataMapTableStatusFiles(backupTableSchemaPath, 
tableSchemaPath, "")
+          }
+        }
+        // after success/failure of commit delete all tablestatus files with 
UUID in their names.
+        // if commit failed then remove the segment directory
+        cleanUpStaleTableStatusFiles(childLoadCommands.map(_.table),
+          operationContext,
+          uuid)
+        operationContext.setProperty("commitComplete", !commitFailed)
+        if (commitFailed) {
+          sys.error("Failed to update table status for pre-aggregate table")
         }
       }
-      // after success/failure of commit delete all tablestatus files with 
UUID in their names.
-      // if commit failed then remove the segment directory
-      cleanUpStaleTableStatusFiles(childLoadCommands.map(_.table),
-        operationContext,
-        uuid)
-      if (commitFailed) {
-        sys.error("Failed to update table status for pre-aggregate table")
-      }
+    } catch {
+      case e: Exception =>
+        operationContext.setProperty("commitComplete", false)
+        LOGGER.error(e, "Problem while committing data maps")
     }
-
-
   }
 }
 

Reply via email to