Hi,

I try to aggregate the value in each partition internally.   
For example, 

Before:
worker 1:            worker 2:
1, 2, 1                 2, 1, 2

After:
worker 1:              worker 2:
(1->2), (2->1)       (1->1), (2->2)

I try to use mappartitions, 
object MyTest {                                                                 
                                 
  def main(args: Array[String]) {                                               
                                 
    val conf = new SparkConf().setAppName("This is a test")                     
                                 
    val sc = new SparkContext(conf)                                             
                                 
                                                                                
                                 
    val fDB = sc.parallelize(List(1, 2, 1, 2, 1, 2, 5, 5, 2), 3)                
                                 
    val result = fDB.mappartitions(testMP).collect                              
                        
    println(result.mkString)                                                    
                                 
    sc.stop                                                                     
                                 
  }                                                                             
                                 
                                                                                
                                 
  def testMP(iter: Iterator[Int]): Iterator[(Long, Int)] = {                    
                                 
    var result = new LongMap[Int]()                                             
                                 
    var cur = 0l                                                                
                                 
                                                                                
                                 
    while (iter.hasNext) {                                                      
                                 
      cur = iter.next.toLong                                                    
                                 
      if (result.contains(cur)) {                                               
                                 
        result(cur) += 1                                                        
                                 
      } else {                                                                  
                                 
        result += (cur, 1)                                                      
                                 
      }                                                                         
                                 
    }                                                                           
                                 
    result.toList.iterator                                                      
                                 
  }                                                                             
                                 
}                                                                               
                                  

But I got the error message no matter how I tried. 

Driver stacktrace:
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependent
Stages(DAGScheduler.scala:1204)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
        at scala.Option.foreach(Option.scala:236)
        at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/05/30 10:41:21 ERROR SparkDeploySchedulerBackend: Asked to remove
non-existent executor 1

Anybody can help me? Thx



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/problem-with-using-mapPartitions-tp12514.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to