This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 5b101ec [CARBONDATA-3594] Optimize getSplits() during compaction
5b101ec is described below
commit 5b101ec781286ab98516eb6d1f327b7a36c4b8c7
Author: Indhumathi27 <[email protected]>
AuthorDate: Fri Nov 22 16:26:10 2019 +0530
[CARBONDATA-3594] Optimize getSplits() during compaction
Problem:
In MergerRDD, for compaction of n segments per task, get splits is called n
times.
Solution:
In MergerRDD, for per compaction task,get all validSegments and call
getsplits only once for those valid segments
This closes #3475
---
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 76 ++++++++++++----------
1 file changed, 41 insertions(+), 35 deletions(-)
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index febaeca..5e33ea7 100644
---
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -343,7 +343,7 @@ class CarbonMergerRDD[K, V](
var noOfBlocks = 0
val taskInfoList = new java.util.ArrayList[Distributable]
- var carbonInputSplits = mutable.Seq[CarbonInputSplit]()
+ var carbonInputSplits = mutable.ArrayBuffer[CarbonInputSplit]()
var allSplits = new java.util.ArrayList[InputSplit]
var splitsOfLastSegment: List[CarbonInputSplit] = null
@@ -359,6 +359,8 @@ class CarbonMergerRDD[K, V](
loadMetadataDetails = SegmentStatusManager
.readLoadMetadata(CarbonTablePath.getMetadataPath(tablePath))
}
+
+ val validSegIds: java.util.List[String] = new util.ArrayList[String]()
// for each valid segment.
for (eachSeg <- carbonMergerMapping.validSegments) {
// In case of range column get the size for calculation of number of
ranges
@@ -369,44 +371,48 @@ class CarbonMergerRDD[K, V](
}
}
}
+ validSegIds.add(eachSeg.getSegmentNo)
+ }
- // map for keeping the relation of a task and its blocks.
- job.getConfiguration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS,
eachSeg.getSegmentNo)
-
+ // map for keeping the relation of a task and its blocks.
+ job.getConfiguration
+ .set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS,
validSegIds.asScala.mkString(","))
+
+ val updated: Boolean = updateStatusManager.getUpdateStatusDetails.length
!= 0
+ // get splits
+ val splits = format.getSplits(job)
+
+ // keep on assigning till last one is reached.
+ if (null != splits && splits.size > 0) {
+ splitsOfLastSegment = splits.asScala
+ .map(_.asInstanceOf[CarbonInputSplit])
+ .filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat)
}.toList.asJava
+ }
+ val filteredSplits =
splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter { entry =>
+ val segmentId = Segment.toSegment(entry.getSegmentId).getSegmentNo
+ val blockInfo = new TableBlockInfo(entry.getFilePath,
+ entry.getStart, entry.getSegmentId,
+ entry.getLocations, entry.getLength, entry.getVersion,
+ updateStatusManager.getDeleteDeltaFilePath(
+ entry.getFilePath,
+ segmentId)
+ )
if (updateStatusManager.getUpdateStatusDetails.length != 0) {
- updateDetails =
updateStatusManager.getInvalidTimestampRange(eachSeg.getSegmentNo)
- }
-
- val updated: Boolean = updateStatusManager.getUpdateStatusDetails.length
!= 0
- // get splits
- val splits = format.getSplits(job)
-
- // keep on assigning till last one is reached.
- if (null != splits && splits.size > 0) {
- splitsOfLastSegment = splits.asScala
- .map(_.asInstanceOf[CarbonInputSplit])
- .filter { split =>
FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava
- }
- val filteredSplits =
splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter{ entry =>
- val blockInfo = new TableBlockInfo(entry.getFilePath,
- entry.getStart, entry.getSegmentId,
- entry.getLocations, entry.getLength, entry.getVersion,
- updateStatusManager.getDeleteDeltaFilePath(
- entry.getFilePath,
- Segment.toSegment(entry.getSegmentId).getSegmentNo)
- )
- (!updated || (updated && (!CarbonUtil
- .isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
- updateDetails, updateStatusManager)))) &&
- FileFormat.COLUMNAR_V3.equals(entry.getFileFormat)
+ updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId)
}
- if (rangeColumn != null) {
- totalTaskCount = totalTaskCount +
-
CarbonCompactionUtil.getTaskCountForSegment(filteredSplits.toArray)
- }
- carbonInputSplits ++:= filteredSplits
- allSplits.addAll(filteredSplits.asJava)
+ // filter splits with V3 data file format
+ // if split is updated, then check for if it is valid segment based on
update details
+ (!updated ||
+ (updated && (!CarbonUtil.isInvalidTableBlock(blockInfo.getSegmentId,
blockInfo.getFilePath,
+ updateDetails, updateStatusManager)))) &&
+ FileFormat.COLUMNAR_V3.equals(entry.getFileFormat)
+ }
+ if (rangeColumn != null) {
+ totalTaskCount = totalTaskCount +
+
CarbonCompactionUtil.getTaskCountForSegment(filteredSplits.toArray)
}
+ carbonInputSplits ++= filteredSplits
+ allSplits.addAll(filteredSplits.asJava)
totalTaskCount = totalTaskCount / carbonMergerMapping.validSegments.size
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
var allRanges: Array[Object] = new Array[Object](0)