Seth Bromberger created SPARK-18916:
---------------------------------------
Summary: Bug in Pregel / mergeMsg with hashmaps
Key: SPARK-18916
URL: https://issues.apache.org/jira/browse/SPARK-18916
Project: Spark
Issue Type: Bug
Components: GraphX
Affects Versions: 2.0.2
Environment: OSX / IntelliJ IDEA 2016.3 CE EAP, Scala 2.11.8, Spark
2.0.2
Reporter: Seth Bromberger
Consider the following (rough) code that attempts to calculate all-pairs
shortest paths via pregel:
{code:scala}
def allPairsShortestPaths: RDD[(VertexId, HashMap[VertexId, ParentDist])] =
{
val initialMsg = HashMap(-1L -> ParentDist(-1L, -1L))
val pregelg = g.mapVertices((vid, vd) => (vd, HashMap[VertexId,
ParentDist](vid -> ParentDist(vid, 0L)))).reverse
def vprog(v: VertexId, value: (VD, HashMap[VertexId, ParentDist]),
message: HashMap[VertexId, ParentDist]): (VD, HashMap[VertexId, ParentDist]) = {
val updatedValues = mm2(value._2, message).filter(v => v._2.dist >= 0)
(value._1, updatedValues)
}
def sendMsg(triplet: EdgeTriplet[(VD, HashMap[VertexId, ParentDist]),
ED]): Iterator[(VertexId, HashMap[VertexId, ParentDist])] = {
val dstVertexId = triplet.dstId
val srcMap = triplet.srcAttr._2
val dstMap = triplet.dstAttr._2 // guaranteed to have dstVertexId as a
key
val updatesToSend : HashMap[VertexId, ParentDist] = srcMap.filter {
case (vid, srcPD) => dstMap.get(vid) match {
case Some(dstPD) => dstPD.dist > srcPD.dist + 1 // if it exists,
is it cheaper?
case _ => true // not found - new update
}
}.map(u => u._1 -> ParentDist(triplet.srcId, u._2.dist +1))
if (updatesToSend.nonEmpty)
Iterator[(VertexId, HashMap[VertexId, ParentDist])]((dstVertexId,
updatesToSend))
else
Iterator.empty
}
def mergeMsg(m1: HashMap[VertexId, ParentDist], m2: HashMap[VertexId,
ParentDist]): HashMap[VertexId, ParentDist] = {
// when the following two lines are commented out, the program fails
with
// 16/12/17 19:53:50 INFO DAGScheduler: Job 24 failed: reduce at
VertexRDDImpl.scala:88, took 0.244042 s
// Exception in thread "main" org.apache.spark.SparkException: Job
aborted due to stage failure: Task 0 in stage 1099.0 failed 1 times, most
recent failure: Lost task 0.0 in stage 1099.0 (TID 129, localhost):
scala.MatchError: (null,null) (of class scala.Tuple2)
m1.foreach(_ => ())
m2.foreach(_ => ())
m1.merged(m2) {
case ((k1, v1), (_, v2)) => (k1, v1.min(v2))
}
}
// mm2 is here just to provide a separate function for vprog. Ideally
we'd just re-use mergeMsg.
def mm2(m1: HashMap[VertexId, ParentDist], m2: HashMap[VertexId,
ParentDist]): HashMap[VertexId, ParentDist] = {
m1.merged(m2) {
case ((k1, v1), (_, v2)) => (k1, v1.min(v2))
case n => throw new Exception("we've got a problem: " + n)
}
}
val pregelRun = pregelg.pregel(initialMsg)(vprog, sendMsg, mergeMsg)
val sps = pregelRun.vertices.map(v => v._1 -> v._2._2)
sps
}
}
{code}
Note the comment in the mergeMsg function: when the messages are explicitly
accessed prior to the .merged statement, the code works. If these side-effect
statements are removed / commented out, the error message in the comments is
generated.
This fails consistently on a 50-node undirected cyclegraph.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]