This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 1790c56 [CARBONDATA-3789] Fix cache issue in case of compaction
failure in compaction post listeners
1790c56 is described below
commit 1790c56e6ecb1d059d60b244095b4fc7ed9c4ba2
Author: akashrn5 <[email protected]>
AuthorDate: Thu Apr 30 14:35:06 2020 +0530
[CARBONDATA-3789] Fix cache issue in case of compaction failure in
compaction post listeners
Why is this PR needed?
Consider a scenario where the post-event Is called in case of compaction
and then the listener is basically selecting the ongoing compacted segment data
and loading SI table load after main table compaction. During that time the
cache is loaded, if in case this is failed, then the cache is still present but
the actual segment isn't, so which will lead to consecutive failures.
What changes were proposed in this PR?
So, when the failure happens, the cache should be cleared either in the
index server or in the driver cache.
Does this PR introduce any user interface change?
No
Is any new test case added?
No
This closes #3733
---
.../spark/rdd/CarbonTableCompactor.scala | 36 ++++++++++++++++------
1 file changed, 27 insertions(+), 9 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index b109294..af9a5c1 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -18,7 +18,7 @@
package org.apache.carbondata.spark.rdd
import java.util
-import java.util.List
+import java.util.{Collections, List}
import java.util.concurrent.ExecutorService
import scala.collection.JavaConverters._
@@ -41,12 +41,12 @@ import
org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.events._
import org.apache.carbondata.hadoop.api.{CarbonInputFormat,
CarbonTableInputFormat}
import org.apache.carbondata.hadoop.CarbonInputSplit
-import org.apache.carbondata.indexserver.DistributedRDDUtils
+import org.apache.carbondata.indexserver.{DistributedRDDUtils, IndexServer}
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.{CarbonCompactionUtil,
CarbonDataMergerUtil, CompactionType}
@@ -92,12 +92,30 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
loadsToMerge.size() > 0)) {
val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
deletePartialLoadsInCompaction()
-
+ val compactedLoad = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
try {
- scanSegmentsAndSubmitJob(loadsToMerge, compactedSegments)
+ scanSegmentsAndSubmitJob(loadsToMerge, compactedSegments,
compactedLoad)
} catch {
case e: Exception =>
LOGGER.error(s"Exception in compaction thread ${ e.getMessage }", e)
+ // in case of exception, clear the cache loaded both in driver, and
index server if
+ // enabled. Consider a scenario where listener is called for SI
table to do load after
+ // compaction, then basically SI loads the new compacted load of
main table to cache as it
+ // needs to select data from main table. after that if the load to
SI fails, cache is to
+ // be cleared.
+ val compactedLoadToClear = compactedLoad.substring(
+ compactedLoad.lastIndexOf(CarbonCommonConstants.UNDERSCORE) + 1)
+ if (!CarbonProperties.getInstance()
+ .isDistributedPruningEnabled(carbonLoadModel.getDatabaseName,
+ carbonLoadModel.getTableName)) {
+ IndexStoreManager.getInstance()
+
.clearInvalidSegments(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+ Collections.singletonList(compactedLoadToClear))
+ } else {
+ IndexServer.getClient
+
.invalidateSegmentCache(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+ Array(compactedLoadToClear))
+ }
throw e
}
@@ -133,7 +151,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
* This will submit the loads to be merged into the executor.
*/
def scanSegmentsAndSubmitJob(loadsToMerge: util.List[LoadMetadataDetails],
- compactedSegments: List[String]): Unit = {
+ compactedSegments: List[String], mergedLoadName: String): Unit = {
loadsToMerge.asScala.foreach { seg =>
LOGGER.info("loads identified for merge is " + seg.getLoadName)
}
@@ -145,10 +163,11 @@ class CarbonTableCompactor(carbonLoadModel:
CarbonLoadModel,
compactionModel.compactionType,
compactionModel.currentPartitions,
compactedSegments)
- triggerCompaction(compactionCallableModel)
+ triggerCompaction(compactionCallableModel, mergedLoadName: String)
}
- private def triggerCompaction(compactionCallableModel:
CompactionCallableModel): Unit = {
+ private def triggerCompaction(compactionCallableModel:
CompactionCallableModel,
+ mergedLoadName: String): Unit = {
val carbonTable = compactionCallableModel.carbonTable
val loadsToMerge = compactionCallableModel.loadsToMerge
val sc = compactionCallableModel.sqlContext
@@ -157,7 +176,6 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
val partitions = compactionCallableModel.currentPartitions
val tablePath = carbonLoadModel.getTablePath
val startTime = System.nanoTime()
- val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
val mergedLoads = compactionCallableModel.compactedSegments
mergedLoads.add(mergedLoadName)
var finalMergeStatus = false