[
https://issues.apache.org/jira/browse/SPARK-5480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14703164#comment-14703164
]
Aram Mkrtchyan edited comment on SPARK-5480 at 8/19/15 3:21 PM:
----------------------------------------------------------------
According to our investigation Spark stores mappings for every partition based
on RDD data, it assumes that underlying data isn't being changed during
computation. When you have an RDD which is returns different results every time
you compute it (RDD source may be mutable), it may not match with it's mapping
for previous state, this may happen even if you persist it (for example it may
not fit in your cluster memory and executor may lose cached partition and
require re-computation). For example EdgePartition's data remains the same, but
RDD data can mutate (have new items).
The solution is not clear, maybe creating updatable EdgePartition or storing
current snapshot somewhere (to HDFS for example) and read from there to create
graph.
This code pretty much explains what I mean:
val edges = EdgeRDD.fromEdges[Int, Int](sparkContext.makeRDD(getEdges, 5).map {
t =>
val r = Random.nextInt()
Edge(r * t.srcId, r * t.dstId, t.attr)
})
val graph = Graph.fromEdges(edges, -1, StorageLevel.NONE,
StorageLevel.NONE)
edges.persist()
edges.count()
edges.unpersist()
graph.subgraph(epred = t => t.srcAttr != -1).triplets.collect()
was (Author: aram.mkrtchyan):
According to our investigation Spark stores mappings for every partition based
on RDD data, it assumes that underlying data isn't being changed during
computation. When you have an RDD which is returns different results every time
you compute it (RDD source may be mutable), it may not match with it's mapping
for previous state, this may happen even if you persist it (for example it may
not fit in your cluster memory and executor may lose cached partition and
require re-computation). For example EdgePartition's data remains the same, but
RDD data can mutate (have new items).
The solution is not clear, maybe creating updatable EdgePartition or storing
current snapshot somewhere (to HDFS for example) and read from there to create
graph.
> GraphX pageRank: java.lang.ArrayIndexOutOfBoundsException:
> -----------------------------------------------------------
>
> Key: SPARK-5480
> URL: https://issues.apache.org/jira/browse/SPARK-5480
> Project: Spark
> Issue Type: Bug
> Components: GraphX
> Affects Versions: 1.2.0, 1.3.1
> Environment: Yarn client
> Reporter: Stephane Maarek
>
> Running the following code:
> val subgraph = graph.subgraph (
> vpred = (id,article) => //working predicate)
> ).cache()
> println( s"Subgraph contains ${subgraph.vertices.count} nodes and
> ${subgraph.edges.count} edges")
> val prGraph = subgraph.staticPageRank(5).cache
> val titleAndPrGraph = subgraph.outerJoinVertices(prGraph.vertices) {
> (v, title, rank) => (rank.getOrElse(0.0), title)
> }
> titleAndPrGraph.vertices.top(13) {
> Ordering.by((entry: (VertexId, (Double, _))) => entry._2._1)
> }.foreach(t => println(t._2._2._1 + ": " + t._2._1 + ", id:" + t._1))
> Returns a graph with 5000 nodes and 4000 edges.
> Then it crashes during the PageRank with the following:
> 15/01/29 05:51:07 INFO scheduler.TaskSetManager: Starting task 125.0 in stage
> 39.0 (TID 1808, *HIDDEN, PROCESS_LOCAL, 2059 bytes)
> 15/01/29 05:51:07 WARN scheduler.TaskSetManager: Lost task 107.0 in stage
> 39.0 (TID 1794, *HIDDEN): java.lang.ArrayIndexOutOfBoundsException: -1
> at
> org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap$mcJI$sp.apply$mcJI$sp(GraphXPrimitiveKeyOpenHashMap.scala:64)
> at
> org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:91)
> at
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
> at
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at
> org.apache.spark.graphx.impl.EdgeRDDImpl$$anonfun$mapEdgePartitions$1.apply(EdgeRDDImpl.scala:110)
> at
> org.apache.spark.graphx.impl.EdgeRDDImpl$$anonfun$mapEdgePartitions$1.apply(EdgeRDDImpl.scala:108)
> at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
> at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]