This is an automated email from the ASF dual-hosted git repository. qiangcai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new bfc9533 [HOTFIX] Avoid calling SecondaryIndexUtil.readFileFooter() for every splits identified during SI creation bfc9533 is described below commit bfc9533ea479e9051a3ce67a58dae63c9af14b42 Author: ajantha-bhat <ajanthab...@gmail.com> AuthorDate: Fri Nov 6 14:31:34 2020 +0530 [HOTFIX] Avoid calling SecondaryIndexUtil.readFileFooter() for every splits identified during SI creation Why is this PR needed? 1. Redundant functionality SecondaryIndexUtil.readFileFooter() exist for SI creation 2. Some info logs can be changed to debug to avoid looping. What changes were proposed in this PR? 1. remove SecondaryIndexUtil.readFileFooter() for SI creation 2. Some info logs to debug to avoid looping. Does this PR introduce any user interface change? No Is any new testcase added? No This closes #4006 --- .../rdd/CarbonSecondaryIndexRDD.scala | 48 ++++++++++------------ 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSecondaryIndexRDD.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSecondaryIndexRDD.scala index 9196c9e..2b31995 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSecondaryIndexRDD.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSecondaryIndexRDD.scala @@ -185,6 +185,7 @@ class CarbonSecondaryIndexRDD[K, V]( } override def internalGetPartitions: Array[Partition] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val startTime = System.currentTimeMillis() val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from( carbonStoreLocation, databaseName, factTableName, tableId) @@ -255,7 +256,7 @@ class CarbonSecondaryIndexRDD[K, V]( val nodeTaskBlocksMap = new java.util.HashMap[String, java.util.List[NodeInfo]]() val nodes = DistributionUtil.getNodeList(sparkContext) - logInfo("no.of.nodes where data present=" + nodeBlockMap.size()) + LOGGER.info("no.of.nodes where data present=" + nodeBlockMap.size()) defaultParallelism = sparkContext.defaultParallelism // Create Spark Partition for each task and assign blocks @@ -278,34 +279,29 @@ class CarbonSecondaryIndexRDD[K, V]( } } } - - // print the node info along with task and number of blocks for the task. - nodeTaskBlocksMap.asScala.foreach((entry: (String, util.List[NodeInfo])) => { - logInfo(s"for the node ${ entry._1 }") - for (elem <- entry._2.asScala) { - logInfo("Task ID is " + elem.TaskId + "no. of blocks is " + elem.noOfBlocks) - } - }) - + if (LOGGER.isDebugEnabled) { + // print the node info along with task and number of blocks for the task. + nodeTaskBlocksMap.asScala.foreach((entry: (String, util.List[NodeInfo])) => { + LOGGER.debug(s"for the node ${ entry._1 }") + for (elem <- entry._2.asScala) { + LOGGER.debug("Task ID is " + elem.TaskId + "no. of blocks is " + elem.noOfBlocks) + } + }) + } val noOfNodes = nodes.length val noOfTasks = result.size - logInfo(s"Identified no.of.Blocks: $noOfBlocks," + - s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: " + - s"$noOfTasks") - logInfo("Time taken to identify Blocks to scan : " + (System.currentTimeMillis() - startTime)) - for (j <- 0 until result.size) { - val multiBlockSplit = result.get(j).asInstanceOf[CarbonSparkPartition].split.value - val splitList = multiBlockSplit.getAllSplits - val tableBlocks: util.List[TableBlockInfo] = CarbonInputSplit.createBlocks(splitList) - val tableBlocksSize: Int = tableBlocks.size - if (tableBlocksSize > 0) { - // read the footer and get column cardinality which will be same for all tasks in a - // segment - val dataFileFooter: DataFileFooter = SecondaryIndexUtil - .readFileFooter(tableBlocks.get(tableBlocks.size() - 1)) + LOGGER.info(s"Identified no.of.Blocks: $noOfBlocks," + + s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: " + + s"$noOfTasks") + LOGGER.info( + "Time taken to identify Blocks to scan : " + (System.currentTimeMillis() - startTime)) + if (LOGGER.isDebugEnabled) { + for (j <- 0 until result.size) { + val multiBlockSplit = result.get(j).asInstanceOf[CarbonSparkPartition].split.value + val splitList = multiBlockSplit.getAllSplits + LOGGER.debug(s"Node: ${ multiBlockSplit.getLocations.mkString(",") }, No.Of Blocks: " + + s"${ CarbonInputSplit.createBlocks(splitList).size }") } - logInfo(s"Node: ${ multiBlockSplit.getLocations.mkString(",") }, No.Of Blocks: " + - s"${ CarbonInputSplit.createBlocks(splitList).size }") } result.toArray(new Array[Partition](result.size)) } else {