This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 1790c56  [CARBONDATA-3789] Fix cache issue in case of compaction 
failure in compaction post listeners
1790c56 is described below

commit 1790c56e6ecb1d059d60b244095b4fc7ed9c4ba2
Author: akashrn5 <[email protected]>
AuthorDate: Thu Apr 30 14:35:06 2020 +0530

    [CARBONDATA-3789] Fix cache issue in case of compaction failure in 
compaction post listeners
    
    Why is this PR needed?
    Consider a scenario where the post-event Is called in case of compaction 
and then the listener is basically selecting the ongoing compacted segment data 
and loading SI table load after main table compaction. During that time the 
cache is loaded, if in case this is failed, then the cache is still present but 
the actual segment isn't, so which will lead to consecutive failures.
    
    What changes were proposed in this PR?
    So, when the failure happens, the cache should be cleared either in the 
index server or in the driver cache.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new test case added?
    No
    
    This closes #3733
---
 .../spark/rdd/CarbonTableCompactor.scala           | 36 ++++++++++++++++------
 1 file changed, 27 insertions(+), 9 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index b109294..af9a5c1 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -18,7 +18,7 @@
 package org.apache.carbondata.spark.rdd
 
 import java.util
-import java.util.List
+import java.util.{Collections, List}
 import java.util.concurrent.ExecutorService
 
 import scala.collection.JavaConverters._
@@ -41,12 +41,12 @@ import 
org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.events._
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, 
CarbonTableInputFormat}
 import org.apache.carbondata.hadoop.CarbonInputSplit
-import org.apache.carbondata.indexserver.DistributedRDDUtils
+import org.apache.carbondata.indexserver.{DistributedRDDUtils, IndexServer}
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, 
CarbonDataMergerUtil, CompactionType}
@@ -92,12 +92,30 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
             loadsToMerge.size() > 0)) {
       val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
       deletePartialLoadsInCompaction()
-
+      val compactedLoad = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
       try {
-        scanSegmentsAndSubmitJob(loadsToMerge, compactedSegments)
+        scanSegmentsAndSubmitJob(loadsToMerge, compactedSegments, 
compactedLoad)
       } catch {
         case e: Exception =>
           LOGGER.error(s"Exception in compaction thread ${ e.getMessage }", e)
+          // in case of exception, clear the cache loaded both in driver, and 
index server if
+          // enabled. Consider a scenario where listener is called for SI 
table to do load after
+          // compaction, then basically SI loads the new compacted load of 
main table to cache as it
+          // needs to select data from main table. after that if the load to 
SI fails, cache is to
+          // be cleared.
+          val compactedLoadToClear = compactedLoad.substring(
+            compactedLoad.lastIndexOf(CarbonCommonConstants.UNDERSCORE) + 1)
+          if (!CarbonProperties.getInstance()
+            .isDistributedPruningEnabled(carbonLoadModel.getDatabaseName,
+              carbonLoadModel.getTableName)) {
+            IndexStoreManager.getInstance()
+              
.clearInvalidSegments(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+                Collections.singletonList(compactedLoadToClear))
+          } else {
+            IndexServer.getClient
+              
.invalidateSegmentCache(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+                Array(compactedLoadToClear))
+          }
           throw e
       }
 
@@ -133,7 +151,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
    * This will submit the loads to be merged into the executor.
    */
   def scanSegmentsAndSubmitJob(loadsToMerge: util.List[LoadMetadataDetails],
-      compactedSegments: List[String]): Unit = {
+      compactedSegments: List[String], mergedLoadName: String): Unit = {
     loadsToMerge.asScala.foreach { seg =>
       LOGGER.info("loads identified for merge is " + seg.getLoadName)
     }
@@ -145,10 +163,11 @@ class CarbonTableCompactor(carbonLoadModel: 
CarbonLoadModel,
       compactionModel.compactionType,
       compactionModel.currentPartitions,
       compactedSegments)
-    triggerCompaction(compactionCallableModel)
+    triggerCompaction(compactionCallableModel, mergedLoadName: String)
   }
 
-  private def triggerCompaction(compactionCallableModel: 
CompactionCallableModel): Unit = {
+  private def triggerCompaction(compactionCallableModel: 
CompactionCallableModel,
+      mergedLoadName: String): Unit = {
     val carbonTable = compactionCallableModel.carbonTable
     val loadsToMerge = compactionCallableModel.loadsToMerge
     val sc = compactionCallableModel.sqlContext
@@ -157,7 +176,6 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
     val partitions = compactionCallableModel.currentPartitions
     val tablePath = carbonLoadModel.getTablePath
     val startTime = System.nanoTime()
-    val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
     val mergedLoads = compactionCallableModel.compactedSegments
     mergedLoads.add(mergedLoadName)
     var finalMergeStatus = false

Reply via email to