Hi, I try to aggregate the value in each partition internally. For example,
Before:
worker 1: worker 2:
1, 2, 1 2, 1, 2
After:
worker 1: worker 2:
(1->2), (2->1) (1->1), (2->2)
I try to use mappartitions,
object MyTest {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("This is a test")
val sc = new SparkContext(conf)
val fDB = sc.parallelize(List(1, 2, 1, 2, 1, 2, 5, 5, 2), 3)
val result = fDB.mappartitions(testMP).collect
println(result.mkString)
sc.stop
}
def testMP(iter: Iterator[Int]): Iterator[(Long, Int)] = {
var result = new LongMap[Int]()
var cur = 0l
while (iter.hasNext) {
cur = iter.next.toLong
if (result.contains(cur)) {
result(cur) += 1
} else {
result += (cur, 1)
}
}
result.toList.iterator
}
}
But I got the error message no matter how I tried.
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependent
Stages(DAGScheduler.scala:1204)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/05/30 10:41:21 ERROR SparkDeploySchedulerBackend: Asked to remove
non-existent executor 1
Anybody can help me? Thx
--
View this message in context:
http://apache-spark-developers-list.1001551.n3.nabble.com/problem-with-using-mapPartitions-tp12514.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
