Zhangshunyu commented on a change in pull request #3701: [WIP]  improve pure 
partition count star performance
URL: https://github.com/apache/carbondata/pull/3701#discussion_r407285145
 
 

 ##########
 File path: 
integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
 ##########
 @@ -55,34 +67,190 @@ case class CarbonCountStar(
     val (job, tableInputFormat) = 
createCarbonInputFormat(absoluteTableIdentifier)
     CarbonInputFormat.setQuerySegment(job.getConfiguration, carbonTable)
 
-    // get row count
-    var rowCount = CarbonUpdateUtil.getRowCount(
-      tableInputFormat.getBlockRowCount(
-        job,
-        carbonTable,
-        CarbonFilters.getPartitions(
-          Seq.empty,
-          sparkSession,
-          TableIdentifier(
-            carbonTable.getTableName,
-            Some(carbonTable.getDatabaseName))).map(_.asJava).orNull, false),
-      carbonTable)
+    val prunedPartitionPaths = new java.util.ArrayList[String]()
+    var totalRowCount: Long = 0
+    if (predicates.nonEmpty) {
+      val names = relation.catalogTable match {
+        case Some(table) => table.partitionColumnNames
+        case _ => Seq.empty
+      }
+      // Get the current partitions from table.
+      var partitions: java.util.List[PartitionSpec] = null
+      if (names.nonEmpty) {
+        val partitionSet = AttributeSet(names
+          .map(p => relation.output.find(_.name.equalsIgnoreCase(p)).get))
+        val partitionKeyFilters = CarbonToSparkAdapter
+          .getPartitionKeyFilter(partitionSet, predicates)
+        // Update the name with lower case as it is case sensitive while 
getting partition info.
+        val updatedPartitionFilters = partitionKeyFilters.map { exp =>
+          exp.transform {
+            case attr: AttributeReference =>
+              CarbonToSparkAdapter.createAttributeReference(
+                attr.name.toLowerCase,
+                attr.dataType,
+                attr.nullable,
+                attr.metadata,
+                attr.exprId,
+                attr.qualifier)
+          }
+        }
+        partitions =
+          CarbonFilters.getPartitions(
+            updatedPartitionFilters.toSeq,
+            SparkSession.getActiveSession.get,
+            relation.catalogTable.get.identifier).orNull.asJava
+        if (partitions != null) {
+          for (partition <- partitions.asScala) {
+            prunedPartitionPaths.add(partition.getLocation.toString)
+          }
+          val details = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+          val validSegmentPaths = details.filter(segment =>
+            ((segment.getSegmentStatus == SegmentStatus.SUCCESS) ||
+              (segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS))
+              && segment.getSegmentFile != null).map(segment => 
segment.getSegmentFile)
+          val tableSegmentIndexes = 
DataMapStoreManager.getInstance().getAllSegmentIndexes(
+            carbonTable.getTableId)
+          if (!tableSegmentIndexes.isEmpty) {
+            // clear invalid cache
+            for (segmentFilePathInCache <- 
tableSegmentIndexes.keySet().asScala) {
+              if (!validSegmentPaths.contains(segmentFilePathInCache)) {
+                // means invalid cache
+                tableSegmentIndexes.remove(segmentFilePathInCache)
+              }
+            }
+          }
+          // init and put absent the valid cache
+          for (validSegmentPath <- validSegmentPaths) {
+            if (tableSegmentIndexes.get(validSegmentPath) == null) {
+              val segmentIndexMeta = new SegmentIndexMeta(validSegmentPath)
+              tableSegmentIndexes.put(validSegmentPath, segmentIndexMeta)
+            }
+          }
+
+          val numThreads = Math.min(Math.max(validSegmentPaths.length, 1), 10)
 
 Review comment:
   > @Zhangshunyu : What do you mean pure partition ? is it just normal 
"partition" ?
   > Also mention what was the bottleneck before in the description.
   @ajantha-bhat 
   
   We find that select count(*) for some partitons is time costly and worse 
than parquet, as currently the count(*) with filter whose culumns are all 
partition columns will load all datamaps of these partitions including block 
info/minmax info, but it is no need to load them ,we can just read it from 
valid index files directly using partition prune  as the rowCount stored inside 
index files, and we can cache these info. For no-sort partition table, minmax 
is almost no using but cost time. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to