This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ab3647  [CARBONDATA-4146]Query fails and the error message "unable to 
get file status" is displayed. query is normal after the "drop metacache on 
table" command is executed.
6ab3647 is described below

commit 6ab364791b814f099de4473a5de2f73fa769de26
Author: liuhe0702 <[email protected]>
AuthorDate: Wed Mar 10 20:54:25 2021 +0800

    [CARBONDATA-4146]Query fails and the error message "unable to get file 
status" is displayed.
    query is normal after the "drop metacache on table" command is executed.
    
    Why is this PR needed?
    During compact execution, the status of the new segment is set to success 
before index
    files are merged. After index files are merged, the carbonindex files are 
deleted.
    As a result, the query task cannot find the cached carbonindex files.
    
    What changes were proposed in this PR?
    Set the status of the new segment to succeeded after index files are merged.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #4104
---
 .../spark/rdd/CarbonTableCompactor.scala           | 61 ++++++++++++++--------
 .../org/apache/spark/util/MergeIndexUtil.scala     | 30 ++++++++++-
 .../merger/CompactionResultSortProcessor.java      |  7 ++-
 .../merger/RowResultMergerProcessor.java           |  8 ++-
 4 files changed, 81 insertions(+), 25 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 68edd09..3a4df12 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -111,6 +111,9 @@ class CarbonTableCompactor(
         }
       }
       try {
+        // need to be cleared for multiple compactions.
+        // only contains the segmentIds which have to be compacted.
+        compactedSegments.clear()
         scanSegmentsAndSubmitJob(validSegments, compactedSegments, 
compactedLoad)
       } catch {
         case e: Exception =>
@@ -333,26 +336,46 @@ class CarbonTableCompactor(
 
     if (finalMergeStatus) {
       val mergedLoadNumber = 
CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
-      var segmentFilesForIUDCompact = new util.ArrayList[Segment]()
       var segmentFileName: String = null
+
+      val isMergeIndex = CarbonProperties.getInstance().getProperty(
+        CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+        CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean
+
+      if (compactionType != CompactionType.IUD_DELETE_DELTA && isMergeIndex) {
+        MergeIndexUtil.mergeIndexFilesOnCompaction(compactionCallableModel)
+      }
+
       if (carbonTable.isHivePartitionTable) {
-        val readPath =
-          
CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath) +
-          CarbonCommonConstants.FILE_SEPARATOR + 
carbonLoadModel.getFactTimeStamp + ".tmp"
-        // Merge all partition files into a single file.
-        segmentFileName =
-          mergedLoadNumber + "_" + carbonLoadModel.getFactTimeStamp
-        val segmentFile = SegmentFileStore
-          .mergeSegmentFiles(readPath,
-            segmentFileName,
-            
CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath))
-        if (segmentFile != null) {
-          SegmentFileStore
-            .moveFromTempFolder(segmentFile,
-              carbonLoadModel.getFactTimeStamp + ".tmp",
-              carbonLoadModel.getTablePath)
+        if (isMergeIndex) {
+          val segmentTmpFileName = carbonLoadModel.getFactTimeStamp + 
CarbonTablePath.SEGMENT_EXT
+          segmentFileName = mergedLoadNumber + 
CarbonCommonConstants.UNDERSCORE + segmentTmpFileName
+          val segmentTmpFile = FileFactory.getCarbonFile(
+            CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath, 
segmentTmpFileName))
+          if (!segmentTmpFile.renameForce(
+            CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath, 
segmentFileName))) {
+            throw new Exception(s"Rename segment file from 
${segmentTmpFileName} " +
+              s"to ${segmentFileName} failed.")
+          }
+        } else {
+          val readPath =
+            
CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath) +
+              CarbonCommonConstants.FILE_SEPARATOR + 
carbonLoadModel.getFactTimeStamp + ".tmp"
+          // Merge all partition files into a single file.
+          segmentFileName =
+            mergedLoadNumber + CarbonCommonConstants.UNDERSCORE + 
carbonLoadModel.getFactTimeStamp
+          val mergedSegmetFile = SegmentFileStore
+            .mergeSegmentFiles(readPath,
+              segmentFileName,
+              
CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath))
+          if (mergedSegmetFile != null) {
+            SegmentFileStore
+              .moveFromTempFolder(mergedSegmetFile,
+                carbonLoadModel.getFactTimeStamp + ".tmp",
+                carbonLoadModel.getTablePath)
+          }
+          segmentFileName = segmentFileName + CarbonTablePath.SEGMENT_EXT
         }
-        segmentFileName = segmentFileName + CarbonTablePath.SEGMENT_EXT
       } else {
         // Get the segment files each updated segment in case of IUD compaction
         val segmentMetaDataInfo = 
CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
@@ -399,10 +422,6 @@ class CarbonTableCompactor(
                             s"${ carbonLoadModel.getTableName }")
       }
 
-      if (compactionType != CompactionType.IUD_DELETE_DELTA) {
-        MergeIndexUtil.mergeIndexFilesOnCompaction(compactionCallableModel)
-      }
-
       val compactionLoadStatusPostEvent = 
AlterTableCompactionPostStatusUpdateEvent(sc.sparkSession,
         carbonTable,
         carbonMergerMapping,
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/util/MergeIndexUtil.scala 
b/integration/spark/src/main/scala/org/apache/spark/util/MergeIndexUtil.scala
index 6b0bbb6..e6a2ad3 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/util/MergeIndexUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/util/MergeIndexUtil.scala
@@ -30,6 +30,7 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.ObjectSerializationUtil
 import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
 
 object MergeIndexUtil {
@@ -43,11 +44,36 @@ object MergeIndexUtil {
     val sparkSession = compactionCallableModel.sqlContext.sparkSession
     if (!carbonTable.isStreamingSink) {
       val mergedSegmentIds = getMergedSegmentIds(mergedLoads)
+      val carbonLoadModel = compactionCallableModel.carbonLoadModel
+      val segmentFileNameMap: java.util.Map[String, String] = new 
util.HashMap[String, String]()
+      var partitionInfo: java.util.List[String] = null
+      var tempFolderPath: String = null
+      var currPartitionSpec: Option[String] = None
+
+      mergedSegmentIds.foreach{ mergedSegmentId =>
+       segmentFileNameMap.put(mergedSegmentId, 
String.valueOf(carbonLoadModel.getFactTimeStamp))
+      }
+
+      if (carbonTable.isHivePartitionTable) {
+        tempFolderPath = carbonLoadModel.getFactTimeStamp.toString + ".tmp"
+        if (compactionCallableModel.compactedPartitions != null) {
+          val partitionSpecs = 
compactionCallableModel.compactedPartitions.getOrElse(List.empty)
+          currPartitionSpec = 
Some(ObjectSerializationUtil.convertObjectToString(
+            new util.ArrayList(partitionSpecs.asJava)))
+          partitionInfo = partitionSpecs.map(_.getLocation.toString).asJava
+        }
+      }
+
       CarbonMergeFilesRDD.mergeIndexFiles(sparkSession,
         mergedSegmentIds,
-        SegmentStatusManager.mapSegmentToStartTime(carbonTable),
+        segmentFileNameMap,
         carbonTable.getTablePath,
-        carbonTable, false)
+        carbonTable,
+        false,
+        partitionInfo,
+        tempFolderPath,
+        false,
+        currPartitionSpec)
     }
   }
 
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 3f27eee..2efd56b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -190,7 +190,12 @@ public class CompactionResultSortProcessor extends 
AbstractResultProcessor {
       LOGGER.error(e.getLocalizedMessage(), e);
       throw e;
     } finally {
-      if (partitionSpec != null) {
+      boolean isMergeIndex = 
Boolean.parseBoolean(CarbonProperties.getInstance().getProperty(
+          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT));
+      // mergeIndex is true, the segment file not need to be written
+      // and will be written during merging index
+      if (partitionSpec != null && !isMergeIndex) {
         try {
           SegmentFileStore
               .writeSegmentFile(carbonLoadModel.getTablePath(), 
carbonLoadModel.getTaskNo(),
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index c044257..6d8cae7 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -34,6 +34,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
 import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.exception.SliceMergerException;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.loading.sort.CarbonPriorityQueue;
@@ -185,7 +186,12 @@ public class RowResultMergerProcessor extends 
AbstractResultProcessor {
         if (isDataPresent) {
           this.dataHandler.closeHandler();
         }
-        if (partitionSpec != null) {
+        boolean isMergeIndex = 
Boolean.parseBoolean(CarbonProperties.getInstance().getProperty(
+            CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+            CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT));
+        // mergeIndex is true, the segment file not need to be written
+        // and will be written during merging index
+        if (partitionSpec != null && !isMergeIndex) {
           SegmentFileStore.writeSegmentFile(loadModel.getTablePath(), 
loadModel.getTaskNo(),
               partitionSpec.getLocation().toString(), 
loadModel.getFactTimeStamp() + "",
               partitionSpec.getPartitions());

Reply via email to