QiangCai commented on a change in pull request #4070:
URL: https://github.com/apache/carbondata/pull/4070#discussion_r553073502



##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
##########
@@ -294,6 +297,49 @@ case class CarbonAddLoadCommand(
       
OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, 
operationContext)
     }
 
+    val deltaFiles = FileFactory.getCarbonFile(segmentPath).listFiles()
+      .filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT))
+    if (deltaFiles.length > 0) {
+      val blockNameToDeltaFilesMap =
+        collection.mutable.Map[String, 
collection.mutable.ListBuffer[(CarbonFile, String)]]()
+      deltaFiles.foreach { deltaFile =>
+        val tmpDeltaFilePath = deltaFile.getAbsolutePath
+          .replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
+            CarbonCommonConstants.FILE_SEPARATOR)
+        val deltaFilePathElements = 
tmpDeltaFilePath.split(CarbonCommonConstants.FILE_SEPARATOR)
+        if (deltaFilePathElements != null && deltaFilePathElements.nonEmpty) {
+          val deltaFileName = 
deltaFilePathElements(deltaFilePathElements.length - 1)
+          val blockName = CarbonTablePath.DataFileUtil
+            .getBlockNameFromDeleteDeltaFile(deltaFileName)
+          if (blockNameToDeltaFilesMap.contains(blockName)) {
+            blockNameToDeltaFilesMap(blockName) += ((deltaFile, deltaFileName))
+          } else {
+            val deltaFileList = new ListBuffer[(CarbonFile, String)]()
+            deltaFileList += ((deltaFile, deltaFileName))
+            blockNameToDeltaFilesMap.put(blockName, deltaFileList)
+          }
+        }
+      }
+      val segmentUpdateDetails = new util.ArrayList[SegmentUpdateDetails]()
+      val columnCompressor = 
CompressorFactory.getInstance.getCompressor.getName
+      blockNameToDeltaFilesMap.foreach { entry =>
+        val segmentUpdateDetail = new SegmentUpdateDetails()
+        segmentUpdateDetail.setBlockName(entry._1)
+        segmentUpdateDetail.setActualBlockName(
+          entry._1 + CarbonCommonConstants.POINT + columnCompressor +
+            CarbonCommonConstants.FACT_FILE_EXT)
+        segmentUpdateDetail.setSegmentName(model.getSegmentId)
+        setMinMaxDeltaStampAndDeletedRowCount(entry._2, segmentUpdateDetail)
+        segmentUpdateDetails.add(segmentUpdateDetail)
+      }
+      val timestamp = System.currentTimeMillis().toString
+      val segmentDetails = new util.HashSet[Segment]()
+      segmentDetails.add(model.getSegment)
+      CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, carbonTable, 
timestamp, false)

Review comment:
       can we write tablestatus file once during add load command?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to