kunal642 commented on a change in pull request #3678: [WIP]: index server 
concurrency fix
URL: https://github.com/apache/carbondata/pull/3678#discussion_r398329624
 
 

 ##########
 File path: 
integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
 ##########
 @@ -96,20 +99,78 @@ class DistributedCountRDD(@transient ss: SparkSession, 
dataMapFormat: Distributa
     new DistributedPruneRDD(ss, dataMapFormat).partitions
   }
 
-  private def generateFuture(split: Seq[InputSplit])
+  private def generateFuture(split: Seq[InputSplit], globalQueue: 
SegmentProcessor)
     (implicit executionContext: ExecutionContext) = {
     Future {
-      val segments = split.map { inputSplit =>
+
+      var segmentsWorkStatus = split.map { inputSplit =>
         val distributable = 
inputSplit.asInstanceOf[DataMapDistributableWrapper]
         distributable.getDistributable.getSegment
           .setReadCommittedScope(dataMapFormat.getReadCommittedScope)
-        distributable.getDistributable.getSegment
+
+        val processedSegments = 
globalQueue.ifProcessSegment(distributable.getDistributable
+          .getSegment.getSegmentNo, dataMapFormat.getCarbonTable.getTableId)
+
+        val segmentWorkStatusList = new 
SegmentWorkStatus(distributable.getDistributable
+          .getSegment, !processedSegments)
+
+        // if ifprocesssegment = true, iswaiting = false
+        val processedSegmentsList = 
globalQueue.processSegment(segmentWorkStatusList,
+          dataMapFormat.getCarbonTable.getTableId)
+        segmentWorkStatusList
+      }
+
+      val queueSize = globalQueue.queueSize()
+      val getGlobalworkQueue = globalQueue.getGlobalWorkQueue
+
+      var segmentsPositive: mutable.HashSet[SegmentWorkStatus] = 
mutable.HashSet.empty
+      var segmentsNegative: mutable.HashSet[SegmentWorkStatus] = 
mutable.HashSet.empty
+
+      segmentsWorkStatus.map { iter =>
+        if (iter.getWaiting == false) {
+          segmentsPositive.add(iter)
+        } else {
+          segmentsNegative.add(iter)
+        }
       }
+
       val defaultDataMap = DataMapStoreManager.getInstance
         .getDataMap(dataMapFormat.getCarbonTable, split.head
           
.asInstanceOf[DataMapDistributableWrapper].getDistributable.getDataMapSchema)
-      defaultDataMap.getBlockRowCount(segments.toList.asJava, dataMapFormat
-        .getPartitions, defaultDataMap).asScala
+      var result = defaultDataMap.getBlockRowCount(segmentsPositive.map { iter 
=>
+        iter.getSegment }.toList.asJava, dataMapFormat.getPartitions,
+        defaultDataMap).asScala
+
+      //  delete from local
+      var segment = segmentsPositive.map { iter =>
+        globalQueue.updateWaitingStatus(iter, 
defaultDataMap.getTable.getTableId)
+      }
+
+      while (segmentsNegative != null && segmentsNegative.size != 0) {
+        segmentsWorkStatus = segmentsNegative.map { iter =>
+          val processedSegments = globalQueue.ifProcessSegment(iter.getSegment
+            .getSegmentNo, defaultDataMap.getTable.getTableId)
+          val processedSegmentsList = globalQueue.processSegment(iter,
+            dataMapFormat.getCarbonTable.getTableId)
+          iter
+        }.toSeq
+        segmentsPositive = mutable.HashSet.empty
+        segmentsNegative = mutable.HashSet.empty
+        segmentsWorkStatus.map { iter =>
+          if (iter.getWaiting == false) {
+            segmentsPositive.add(iter)
+          } else {
+            segmentsNegative.add(iter)
+          }
+        }
+        result = result.++(defaultDataMap.getBlockRowCount(segmentsPositive
 
 Review comment:
   Please remove all mutable collections and var. I dont see any real use for 
these

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to