Karan980 commented on a change in pull request #4070:
URL: https://github.com/apache/carbondata/pull/4070#discussion_r566840116



##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
##########
@@ -294,6 +297,49 @@ case class CarbonAddLoadCommand(
       
OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, 
operationContext)
     }
 
+    val deltaFiles = FileFactory.getCarbonFile(segmentPath).listFiles()
+      .filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT))
+    if (deltaFiles.length > 0) {
+      val blockNameToDeltaFilesMap =
+        collection.mutable.Map[String, 
collection.mutable.ListBuffer[(CarbonFile, String)]]()
+      deltaFiles.foreach { deltaFile =>
+        val tmpDeltaFilePath = deltaFile.getAbsolutePath
+          .replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
+            CarbonCommonConstants.FILE_SEPARATOR)
+        val deltaFilePathElements = 
tmpDeltaFilePath.split(CarbonCommonConstants.FILE_SEPARATOR)
+        if (deltaFilePathElements != null && deltaFilePathElements.nonEmpty) {
+          val deltaFileName = 
deltaFilePathElements(deltaFilePathElements.length - 1)
+          val blockName = CarbonTablePath.DataFileUtil
+            .getBlockNameFromDeleteDeltaFile(deltaFileName)
+          if (blockNameToDeltaFilesMap.contains(blockName)) {
+            blockNameToDeltaFilesMap(blockName) += ((deltaFile, deltaFileName))
+          } else {
+            val deltaFileList = new ListBuffer[(CarbonFile, String)]()
+            deltaFileList += ((deltaFile, deltaFileName))
+            blockNameToDeltaFilesMap.put(blockName, deltaFileList)
+          }
+        }
+      }
+      val segmentUpdateDetails = new util.ArrayList[SegmentUpdateDetails]()
+      val columnCompressor = 
CompressorFactory.getInstance.getCompressor.getName
+      blockNameToDeltaFilesMap.foreach { entry =>
+        val segmentUpdateDetail = new SegmentUpdateDetails()
+        segmentUpdateDetail.setBlockName(entry._1)
+        segmentUpdateDetail.setActualBlockName(
+          entry._1 + CarbonCommonConstants.POINT + columnCompressor +
+            CarbonCommonConstants.FACT_FILE_EXT)
+        segmentUpdateDetail.setSegmentName(model.getSegmentId)
+        setMinMaxDeltaStampAndDeletedRowCount(entry._2, segmentUpdateDetail)
+        segmentUpdateDetails.add(segmentUpdateDetail)
+      }
+      val timestamp = System.currentTimeMillis().toString
+      val segmentDetails = new util.HashSet[Segment]()
+      segmentDetails.add(model.getSegment)
+      CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, carbonTable, 
timestamp, false)

Review comment:
       I have analyzed this thing and find that before writing the new 
tableUpdateStatus file, we look for segment entries in tableStatus file and 
then write data for only those segments in updateTableStatus file which are 
present in tableStatus file. So, if we don't have newly added segment or the 
segment which we are adding now entry in the tableStatus file, then it will not 
write its corresponding entry in new tableUpdateStatus file also. So, for this 
scenario it is required to write the tableStatus file twice.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to