ajantha-bhat commented on a change in pull request #3701: [CARBONDATA-3770]
Improve partition count star query performance
URL: https://github.com/apache/carbondata/pull/3701#discussion_r407411990
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
##########
@@ -108,4 +125,145 @@ case class CarbonCountStar(
CarbonInputFormatUtil.setDataMapJobIfConfigured(job.getConfiguration)
(job, carbonInputFormat)
}
+
+ // The detail of query flow as following for pure partition count star:
+ // Step 1. check whether it is pure partition count star by filter
+ // Step 2. read tablestatus to get all valid segments, remove the segment
file cache of invalid
+ // segment and expired segment
+ // Step 3. use multi-thread to read segment files which not in cache and
cache index files list
+ // of each segment into memory. If its index files already exist in cache,
not required to
+ // read again.
+ // Step 4. use multi-thread to prune segment and partition to get pruned
index file list, which
+ // can prune most index files and reduce the files num.
+ // Step 5. read the count from pruned index file directly and cache it, get
from cache if exist
+ // in the index_file <-> rowCount map.
+ private def getRowCountPurePartitionPrune: Long = {
+ var rowCount: Long = 0
+ val prunedPartitionPaths = new java.util.ArrayList[String]()
+ // Get the current partitions from table.
+ val partitions = CarbonFilters.getPrunedPartitions(relation, predicates)
+ if (partitions != null) {
+ for (partition <- partitions) {
+ prunedPartitionPaths.add(partition.getLocation.toString)
+ }
+ val details =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+ val validSegmentPaths = details.filter(segment =>
+ ((segment.getSegmentStatus == SegmentStatus.SUCCESS) ||
+ (segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS))
+ && segment.getSegmentFile != null).map(segment =>
segment.getSegmentFile)
+ val tableSegmentIndexes =
DataMapStoreManager.getInstance().getAllSegmentIndexes(
+ carbonTable.getTableId)
+ if (!tableSegmentIndexes.isEmpty) {
+ // clear invalid cache
+ for (segmentFilePathInCache <- tableSegmentIndexes.keySet().asScala) {
+ if (!validSegmentPaths.contains(segmentFilePathInCache)) {
+ // means invalid cache
+ tableSegmentIndexes.remove(segmentFilePathInCache)
+ }
+ }
+ }
+ // init and put absent the valid cache
+ for (validSegmentPath <- validSegmentPaths) {
+ if (tableSegmentIndexes.get(validSegmentPath) == null) {
+ val segmentIndexMeta = new SegmentIndexMeta(validSegmentPath)
+ tableSegmentIndexes.put(validSegmentPath, segmentIndexMeta)
+ }
+ }
+
+ val numThreads = Math.min(Math.max(validSegmentPaths.length, 1), 4)
Review comment:
Already we pass partition info to prune method to load min max of only
matched partition #3568.
I can understand that you want to avoid loading min max also as it is just
the count(*).
But just for that keeping a new cache (tableSegmentIndexes) and having
multithread in driver is not efficient as memory usage in driver will be more
and concurrent query will be impacted due to multithread.
I still feel, count(*) loading min max is ok as it helps other queries.
Let's see other people opinion
@kunal642 , @QiangCai @jackylk
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services