Hi, I try to aggregate the value in each partition internally. For example,
Before: worker 1: worker 2: 1, 2, 1 2, 1, 2 After: worker 1: worker 2: (1->2), (2->1) (1->1), (2->2) I try to use mappartitions, object MyTest { def main(args: Array[String]) { val conf = new SparkConf().setAppName("This is a test") val sc = new SparkContext(conf) val fDB = sc.parallelize(List(1, 2, 1, 2, 1, 2, 5, 5, 2), 3) val result = fDB.mappartitions(testMP).collect println(result.mkString) sc.stop } def testMP(iter: Iterator[Int]): Iterator[(Long, Int)] = { var result = new LongMap[Int]() var cur = 0l while (iter.hasNext) { cur = iter.next.toLong if (result.contains(cur)) { result(cur) += 1 } else { result += (cur, 1) } } result.toList.iterator } } But I got the error message no matter how I tried. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependent Stages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 15/05/30 10:41:21 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 1 Anybody can help me? Thx -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/problem-with-using-mapPartitions-tp12514.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org