This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new a4921e9  [CARBONDATA-4145] Query fails and the message "File does not 
exist: xxxx.carbondata" is displayed
a4921e9 is described below

commit a4921e9383eefe982df5b6db73a60070765dd1c0
Author: liuhe0702 <[email protected]>
AuthorDate: Wed Mar 10 17:24:42 2021 +0800

    [CARBONDATA-4145] Query fails and the message "File does not exist: 
xxxx.carbondata" is
    displayed
    
    Why is this PR needed?
    If an exception occurs when the refresh index command is executed, a task 
has been
    successful. The new query will be failed.
    Reason: After the compaction task is executed successfully, the old 
carbondata files are
    deleted. If other exception occurs, the deleted files are missing.
    This PR will fix this issue.
    
    What changes were proposed in this PR?
    When all tasks are successful, the driver deletes the old carbondata files.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #4103
---
 .../secondaryindex/rdd/CarbonSIRebuildRDD.scala    | 10 ++-------
 .../secondaryindex/util/SecondaryIndexUtil.scala   | 24 ++++++++++++++++++++--
 2 files changed, 24 insertions(+), 10 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
index e076a5d..90fe242 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
@@ -215,7 +215,7 @@ class CarbonSIRebuildRDD[K, V](
 
         // add task completion listener to clean up the resources
         context.addTaskCompletionListener { _ =>
-          close(splitList)
+          close()
         }
         try {
           // fire a query and get the results.
@@ -271,7 +271,7 @@ class CarbonSIRebuildRDD[K, V](
           throw e
       }
 
-      private def close(splits: util.List[CarbonInputSplit]): Unit = {
+      private def close(): Unit = {
         deleteLocalDataFolders()
         // close all the query executor service and clean up memory acquired 
during query processing
         if (null != exec) {
@@ -284,12 +284,6 @@ class CarbonSIRebuildRDD[K, V](
           LOGGER.info("Closing compaction processor instance to clean up 
loading resources")
           processor.close()
         }
-
-        // delete all the old data files which are used for merging
-        splits.asScala.foreach { split =>
-          val carbonFile = FileFactory.getCarbonFile(split.getFilePath)
-          carbonFile.delete()
-        }
       }
 
       private def deleteLocalDataFolders(): Unit = {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index e488514..0977b77 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -66,6 +66,7 @@ import 
org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, Compaction
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.MergeResultImpl
 import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
+import org.apache.carbondata.spark.rdd.CarbonSparkPartition
 
 object SecondaryIndexUtil {
 
@@ -190,14 +191,16 @@ object SecondaryIndexUtil {
     var rebuiltSegments: Set[String] = Set[String]()
     val segmentIdToLoadStartTimeMap: util.Map[String, String] = new 
util.HashMap()
     try {
+      var siRebuildRDD: CarbonSIRebuildRDD[String, Boolean] = null
       val mergeStatus = if (SortScope.GLOBAL_SORT == 
indexCarbonTable.getSortScope &&
       !indexCarbonTable.getSortColumns.isEmpty) {
         mergeSISegmentDataFiles(sc.sparkSession, carbonLoadModel, 
carbonMergerMapping)
       } else {
-        new CarbonSIRebuildRDD(sc.sparkSession,
+        siRebuildRDD = new CarbonSIRebuildRDD(sc.sparkSession,
           new MergeResultImpl(),
           carbonLoadModel,
-          carbonMergerMapping).collect
+          carbonMergerMapping)
+        siRebuildRDD.collect
       }
       if (null != mergeStatus && mergeStatus.length == 0) {
         finalMergeStatus = true
@@ -225,6 +228,11 @@ object SecondaryIndexUtil {
             deleteOldCarbonDataFiles(carbonLoadModel.getFactTimeStamp,
               validSegmentsToUse.toList.asJava,
               indexCarbonTable)
+          } else {
+            siRebuildRDD.partitions.foreach { partition =>
+              val carbonSparkPartition = 
partition.asInstanceOf[CarbonSparkPartition]
+              deleteOldCarbonDataFiles(carbonSparkPartition)
+            }
           }
           mergedSegments.asScala.map { seg =>
             val file = SegmentFileStore.writeSegmentFile(
@@ -346,6 +354,18 @@ object SecondaryIndexUtil {
   }
 
   /**
+   * This method delete the carbondata files present in pertition of during 
small
+   * datafile merge after loading a segment to SI table. It should be deleted 
after
+   * data file merge operation, else, concurrency can cause file not found 
issues.
+   */
+  private def deleteOldCarbonDataFiles(partition: CarbonSparkPartition): Unit 
= {
+    val splitList = partition.split.value.getAllSplits
+    splitList.asScala.foreach { split =>
+      val carbonFile = FileFactory.getCarbonFile(split.getFilePath)
+      carbonFile.delete()
+    }
+  }
+  /**
    * Identifies the group of blocks to be merged based on the merge size.
    * This should be per segment grouping.
    *

Reply via email to