Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 591a4896c -> 2c9cd7105


Fixed query statistics for queries with limit


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/feda950c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/feda950c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/feda950c

Branch: refs/heads/master
Commit: feda950c6170af95d451097f2dfa8e322a06c93b
Parents: 591a489
Author: foryou2030 <foryou2...@126.com>
Authored: Tue Sep 20 13:29:14 2016 +0800
Committer: Venkata Ramana G <ramana.gollam...@huawei.com>
Committed: Tue Sep 20 18:25:21 2016 +0530

----------------------------------------------------------------------
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 43 ++++++++------------
 1 file changed, 16 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/feda950c/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index f2cfd81..e676687 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -187,6 +187,10 @@ class CarbonScanRDD[V: ClassTag](
       var rowIterator: CarbonIterator[Array[Any]] = _
       var queryStartTime: Long = 0
       try {
+        context.addTaskCompletionListener(context => {
+          clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
+          logStatistics()
+        })
         val carbonSparkPartition = 
thepartition.asInstanceOf[CarbonSparkPartition]
         if(!carbonSparkPartition.tableBlockInfos.isEmpty) {
           queryModel.setQueryId(queryModel.getQueryId + "_" + 
carbonSparkPartition.idx)
@@ -225,23 +229,6 @@ class CarbonScanRDD[V: ClassTag](
           finished = (null == rowIterator) || (!rowIterator.hasNext)
           havePair = !finished
         }
-        if (finished) {
-          clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
-          if (null != queryModel.getStatisticsRecorder) {
-            var queryStatistic = new QueryStatistic()
-            queryStatistic
-              .addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
-                System.currentTimeMillis - queryStartTime
-              )
-            queryModel.getStatisticsRecorder.recordStatistics(queryStatistic)
-            // result size
-            queryStatistic = new QueryStatistic()
-            
queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, 
recordCount)
-            queryModel.getStatisticsRecorder.recordStatistics(queryStatistic)
-            // print executor query statistics for each task_id
-            queryModel.getStatisticsRecorder.logStatisticsAsTableExecutor()
-          }
-        }
         !finished
       }
 
@@ -251,8 +238,17 @@ class CarbonScanRDD[V: ClassTag](
         }
         havePair = false
         recordCount += 1
-        if (queryModel.getLimit != -1 && recordCount >= queryModel.getLimit) {
-          clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
+        keyClass.getValue(rowIterator.next())
+      }
+
+      def clearDictionaryCache(columnToDictionaryMap: java.util.Map[String, 
Dictionary]) = {
+        if (null != columnToDictionaryMap) {
+          org.apache.carbondata.spark.util.CarbonQueryUtil
+            .clearColumnDictionaryCache(columnToDictionaryMap)
+        }
+      }
+
+      def logStatistics(): Unit = {
           if (null != queryModel.getStatisticsRecorder) {
             var queryStatistic = new QueryStatistic()
             queryStatistic
@@ -268,15 +264,8 @@ class CarbonScanRDD[V: ClassTag](
             queryModel.getStatisticsRecorder.logStatisticsAsTableExecutor()
           }
         }
-        keyClass.getValue(rowIterator.next())
       }
-      def clearDictionaryCache(columnToDictionaryMap: java.util.Map[String, 
Dictionary]) = {
-        if (null != columnToDictionaryMap) {
-          org.apache.carbondata.spark.util.CarbonQueryUtil
-            .clearColumnDictionaryCache(columnToDictionaryMap)
-        }
-      }
-    }
+
     iter
   }
 

Reply via email to