Zhangshunyu commented on a change in pull request #3701: [WIP] improve pure
partition count star performance
URL: https://github.com/apache/carbondata/pull/3701#discussion_r407285145
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
##########
@@ -55,34 +67,190 @@ case class CarbonCountStar(
val (job, tableInputFormat) =
createCarbonInputFormat(absoluteTableIdentifier)
CarbonInputFormat.setQuerySegment(job.getConfiguration, carbonTable)
- // get row count
- var rowCount = CarbonUpdateUtil.getRowCount(
- tableInputFormat.getBlockRowCount(
- job,
- carbonTable,
- CarbonFilters.getPartitions(
- Seq.empty,
- sparkSession,
- TableIdentifier(
- carbonTable.getTableName,
- Some(carbonTable.getDatabaseName))).map(_.asJava).orNull, false),
- carbonTable)
+ val prunedPartitionPaths = new java.util.ArrayList[String]()
+ var totalRowCount: Long = 0
+ if (predicates.nonEmpty) {
+ val names = relation.catalogTable match {
+ case Some(table) => table.partitionColumnNames
+ case _ => Seq.empty
+ }
+ // Get the current partitions from table.
+ var partitions: java.util.List[PartitionSpec] = null
+ if (names.nonEmpty) {
+ val partitionSet = AttributeSet(names
+ .map(p => relation.output.find(_.name.equalsIgnoreCase(p)).get))
+ val partitionKeyFilters = CarbonToSparkAdapter
+ .getPartitionKeyFilter(partitionSet, predicates)
+ // Update the name with lower case as it is case sensitive while
getting partition info.
+ val updatedPartitionFilters = partitionKeyFilters.map { exp =>
+ exp.transform {
+ case attr: AttributeReference =>
+ CarbonToSparkAdapter.createAttributeReference(
+ attr.name.toLowerCase,
+ attr.dataType,
+ attr.nullable,
+ attr.metadata,
+ attr.exprId,
+ attr.qualifier)
+ }
+ }
+ partitions =
+ CarbonFilters.getPartitions(
+ updatedPartitionFilters.toSeq,
+ SparkSession.getActiveSession.get,
+ relation.catalogTable.get.identifier).orNull.asJava
+ if (partitions != null) {
+ for (partition <- partitions.asScala) {
+ prunedPartitionPaths.add(partition.getLocation.toString)
+ }
+ val details =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+ val validSegmentPaths = details.filter(segment =>
+ ((segment.getSegmentStatus == SegmentStatus.SUCCESS) ||
+ (segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS))
+ && segment.getSegmentFile != null).map(segment =>
segment.getSegmentFile)
+ val tableSegmentIndexes =
DataMapStoreManager.getInstance().getAllSegmentIndexes(
+ carbonTable.getTableId)
+ if (!tableSegmentIndexes.isEmpty) {
+ // clear invalid cache
+ for (segmentFilePathInCache <-
tableSegmentIndexes.keySet().asScala) {
+ if (!validSegmentPaths.contains(segmentFilePathInCache)) {
+ // means invalid cache
+ tableSegmentIndexes.remove(segmentFilePathInCache)
+ }
+ }
+ }
+ // init and put absent the valid cache
+ for (validSegmentPath <- validSegmentPaths) {
+ if (tableSegmentIndexes.get(validSegmentPath) == null) {
+ val segmentIndexMeta = new SegmentIndexMeta(validSegmentPath)
+ tableSegmentIndexes.put(validSegmentPath, segmentIndexMeta)
+ }
+ }
+
+ val numThreads = Math.min(Math.max(validSegmentPaths.length, 1), 10)
Review comment:
> @Zhangshunyu : What do you mean pure partition ? is it just normal
"partition" ?
> Also mention what was the bottleneck before in the description.
@ajantha-bhat
We find that select count(*) for some partitons is time costly and worse
than parquet, as currently the count(*) with filter whose culumns are all
partition columns will load all datamaps of these partitions including block
info/minmax info, but it is no need to load them ,we can just read it from
valid index files directly using partition prune as the rowCount stored inside
index files, and we can cache these info. For no-sort partition table, minmax
is almost no using but cost time.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services