Zhangshunyu commented on a change in pull request #3701: [CARBONDATA-3770] 
Improve partition count star query performance
URL: https://github.com/apache/carbondata/pull/3701#discussion_r407362944
 
 

 ##########
 File path: 
integration/spark/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
 ##########
 @@ -55,34 +67,190 @@ case class CarbonCountStar(
     val (job, tableInputFormat) = 
createCarbonInputFormat(absoluteTableIdentifier)
     CarbonInputFormat.setQuerySegment(job.getConfiguration, carbonTable)
 
-    // get row count
-    var rowCount = CarbonUpdateUtil.getRowCount(
-      tableInputFormat.getBlockRowCount(
-        job,
-        carbonTable,
-        CarbonFilters.getPartitions(
-          Seq.empty,
-          sparkSession,
-          TableIdentifier(
-            carbonTable.getTableName,
-            Some(carbonTable.getDatabaseName))).map(_.asJava).orNull, false),
-      carbonTable)
+    val prunedPartitionPaths = new java.util.ArrayList[String]()
+    var totalRowCount: Long = 0
+    if (predicates.nonEmpty) {
+      val names = relation.catalogTable match {
+        case Some(table) => table.partitionColumnNames
+        case _ => Seq.empty
+      }
+      // Get the current partitions from table.
+      var partitions: java.util.List[PartitionSpec] = null
+      if (names.nonEmpty) {
+        val partitionSet = AttributeSet(names
+          .map(p => relation.output.find(_.name.equalsIgnoreCase(p)).get))
+        val partitionKeyFilters = CarbonToSparkAdapter
+          .getPartitionKeyFilter(partitionSet, predicates)
+        // Update the name with lower case as it is case sensitive while 
getting partition info.
+        val updatedPartitionFilters = partitionKeyFilters.map { exp =>
+          exp.transform {
+            case attr: AttributeReference =>
+              CarbonToSparkAdapter.createAttributeReference(
+                attr.name.toLowerCase,
+                attr.dataType,
+                attr.nullable,
+                attr.metadata,
+                attr.exprId,
+                attr.qualifier)
+          }
+        }
+        partitions =
+          CarbonFilters.getPartitions(
+            updatedPartitionFilters.toSeq,
+            SparkSession.getActiveSession.get,
+            relation.catalogTable.get.identifier).orNull.asJava
+        if (partitions != null) {
+          for (partition <- partitions.asScala) {
+            prunedPartitionPaths.add(partition.getLocation.toString)
+          }
+          val details = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+          val validSegmentPaths = details.filter(segment =>
+            ((segment.getSegmentStatus == SegmentStatus.SUCCESS) ||
+              (segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS))
+              && segment.getSegmentFile != null).map(segment => 
segment.getSegmentFile)
+          val tableSegmentIndexes = 
DataMapStoreManager.getInstance().getAllSegmentIndexes(
+            carbonTable.getTableId)
+          if (!tableSegmentIndexes.isEmpty) {
+            // clear invalid cache
+            for (segmentFilePathInCache <- 
tableSegmentIndexes.keySet().asScala) {
+              if (!validSegmentPaths.contains(segmentFilePathInCache)) {
+                // means invalid cache
+                tableSegmentIndexes.remove(segmentFilePathInCache)
+              }
+            }
+          }
+          // init and put absent the valid cache
+          for (validSegmentPath <- validSegmentPaths) {
+            if (tableSegmentIndexes.get(validSegmentPath) == null) {
+              val segmentIndexMeta = new SegmentIndexMeta(validSegmentPath)
+              tableSegmentIndexes.put(validSegmentPath, segmentIndexMeta)
+            }
+          }
+
+          val numThreads = Math.min(Math.max(validSegmentPaths.length, 1), 10)
 
 Review comment:
   @ajantha-bhat OK, will reduce core pool size

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to