This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new a4921e9 [CARBONDATA-4145] Query fails and the message "File does not
exist: xxxx.carbondata" is displayed
a4921e9 is described below
commit a4921e9383eefe982df5b6db73a60070765dd1c0
Author: liuhe0702 <[email protected]>
AuthorDate: Wed Mar 10 17:24:42 2021 +0800
[CARBONDATA-4145] Query fails and the message "File does not exist:
xxxx.carbondata" is
displayed
Why is this PR needed?
If an exception occurs when the refresh index command is executed, a task
has been
successful. The new query will be failed.
Reason: After the compaction task is executed successfully, the old
carbondata files are
deleted. If other exception occurs, the deleted files are missing.
This PR will fix this issue.
What changes were proposed in this PR?
When all tasks are successful, the driver deletes the old carbondata files.
Does this PR introduce any user interface change?
No
Is any new testcase added?
No
This closes #4103
---
.../secondaryindex/rdd/CarbonSIRebuildRDD.scala | 10 ++-------
.../secondaryindex/util/SecondaryIndexUtil.scala | 24 ++++++++++++++++++++--
2 files changed, 24 insertions(+), 10 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
index e076a5d..90fe242 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
@@ -215,7 +215,7 @@ class CarbonSIRebuildRDD[K, V](
// add task completion listener to clean up the resources
context.addTaskCompletionListener { _ =>
- close(splitList)
+ close()
}
try {
// fire a query and get the results.
@@ -271,7 +271,7 @@ class CarbonSIRebuildRDD[K, V](
throw e
}
- private def close(splits: util.List[CarbonInputSplit]): Unit = {
+ private def close(): Unit = {
deleteLocalDataFolders()
// close all the query executor service and clean up memory acquired
during query processing
if (null != exec) {
@@ -284,12 +284,6 @@ class CarbonSIRebuildRDD[K, V](
LOGGER.info("Closing compaction processor instance to clean up
loading resources")
processor.close()
}
-
- // delete all the old data files which are used for merging
- splits.asScala.foreach { split =>
- val carbonFile = FileFactory.getCarbonFile(split.getFilePath)
- carbonFile.delete()
- }
}
private def deleteLocalDataFolders(): Unit = {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index e488514..0977b77 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -66,6 +66,7 @@ import
org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, Compaction
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.MergeResultImpl
import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
+import org.apache.carbondata.spark.rdd.CarbonSparkPartition
object SecondaryIndexUtil {
@@ -190,14 +191,16 @@ object SecondaryIndexUtil {
var rebuiltSegments: Set[String] = Set[String]()
val segmentIdToLoadStartTimeMap: util.Map[String, String] = new
util.HashMap()
try {
+ var siRebuildRDD: CarbonSIRebuildRDD[String, Boolean] = null
val mergeStatus = if (SortScope.GLOBAL_SORT ==
indexCarbonTable.getSortScope &&
!indexCarbonTable.getSortColumns.isEmpty) {
mergeSISegmentDataFiles(sc.sparkSession, carbonLoadModel,
carbonMergerMapping)
} else {
- new CarbonSIRebuildRDD(sc.sparkSession,
+ siRebuildRDD = new CarbonSIRebuildRDD(sc.sparkSession,
new MergeResultImpl(),
carbonLoadModel,
- carbonMergerMapping).collect
+ carbonMergerMapping)
+ siRebuildRDD.collect
}
if (null != mergeStatus && mergeStatus.length == 0) {
finalMergeStatus = true
@@ -225,6 +228,11 @@ object SecondaryIndexUtil {
deleteOldCarbonDataFiles(carbonLoadModel.getFactTimeStamp,
validSegmentsToUse.toList.asJava,
indexCarbonTable)
+ } else {
+ siRebuildRDD.partitions.foreach { partition =>
+ val carbonSparkPartition =
partition.asInstanceOf[CarbonSparkPartition]
+ deleteOldCarbonDataFiles(carbonSparkPartition)
+ }
}
mergedSegments.asScala.map { seg =>
val file = SegmentFileStore.writeSegmentFile(
@@ -346,6 +354,18 @@ object SecondaryIndexUtil {
}
/**
+ * This method delete the carbondata files present in pertition of during
small
+ * datafile merge after loading a segment to SI table. It should be deleted
after
+ * data file merge operation, else, concurrency can cause file not found
issues.
+ */
+ private def deleteOldCarbonDataFiles(partition: CarbonSparkPartition): Unit
= {
+ val splitList = partition.split.value.getAllSplits
+ splitList.asScala.foreach { split =>
+ val carbonFile = FileFactory.getCarbonFile(split.getFilePath)
+ carbonFile.delete()
+ }
+ }
+ /**
* Identifies the group of blocks to be merged based on the merge size.
* This should be per segment grouping.
*