Karan980 commented on a change in pull request #4070:
URL: https://github.com/apache/carbondata/pull/4070#discussion_r566840116
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
##########
@@ -294,6 +297,49 @@ case class CarbonAddLoadCommand(
OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent,
operationContext)
}
+ val deltaFiles = FileFactory.getCarbonFile(segmentPath).listFiles()
+ .filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT))
+ if (deltaFiles.length > 0) {
+ val blockNameToDeltaFilesMap =
+ collection.mutable.Map[String,
collection.mutable.ListBuffer[(CarbonFile, String)]]()
+ deltaFiles.foreach { deltaFile =>
+ val tmpDeltaFilePath = deltaFile.getAbsolutePath
+ .replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
+ CarbonCommonConstants.FILE_SEPARATOR)
+ val deltaFilePathElements =
tmpDeltaFilePath.split(CarbonCommonConstants.FILE_SEPARATOR)
+ if (deltaFilePathElements != null && deltaFilePathElements.nonEmpty) {
+ val deltaFileName =
deltaFilePathElements(deltaFilePathElements.length - 1)
+ val blockName = CarbonTablePath.DataFileUtil
+ .getBlockNameFromDeleteDeltaFile(deltaFileName)
+ if (blockNameToDeltaFilesMap.contains(blockName)) {
+ blockNameToDeltaFilesMap(blockName) += ((deltaFile, deltaFileName))
+ } else {
+ val deltaFileList = new ListBuffer[(CarbonFile, String)]()
+ deltaFileList += ((deltaFile, deltaFileName))
+ blockNameToDeltaFilesMap.put(blockName, deltaFileList)
+ }
+ }
+ }
+ val segmentUpdateDetails = new util.ArrayList[SegmentUpdateDetails]()
+ val columnCompressor =
CompressorFactory.getInstance.getCompressor.getName
+ blockNameToDeltaFilesMap.foreach { entry =>
+ val segmentUpdateDetail = new SegmentUpdateDetails()
+ segmentUpdateDetail.setBlockName(entry._1)
+ segmentUpdateDetail.setActualBlockName(
+ entry._1 + CarbonCommonConstants.POINT + columnCompressor +
+ CarbonCommonConstants.FACT_FILE_EXT)
+ segmentUpdateDetail.setSegmentName(model.getSegmentId)
+ setMinMaxDeltaStampAndDeletedRowCount(entry._2, segmentUpdateDetail)
+ segmentUpdateDetails.add(segmentUpdateDetail)
+ }
+ val timestamp = System.currentTimeMillis().toString
+ val segmentDetails = new util.HashSet[Segment]()
+ segmentDetails.add(model.getSegment)
+ CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, carbonTable,
timestamp, false)
Review comment:
I have analyzed this thing and find that before writing the new
tableUpdateStatus file, we look for segment entries in tableStatus file and
then write data for only those segments in updateTableStatus file which are
present in tableStatus file. So, if we don't have newly added segment or the
segment which we are adding now entry in the tableStatus file, then it will not
write its corresponding entry in new tableUpdateStatus file also. So, for this
scenario it is required to write the tableStatus file twice.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]