[ 
https://issues.apache.org/jira/browse/SPARK-5480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14703164#comment-14703164
 ] 

Aram Mkrtchyan edited comment on SPARK-5480 at 8/19/15 3:21 PM:
----------------------------------------------------------------

According to our investigation Spark stores mappings for every partition based 
on RDD data, it assumes that underlying data isn't being changed during 
computation. When you have an RDD which is returns different results every time 
you compute it (RDD source may be mutable), it may not match with it's mapping 
for previous state, this may happen even if you persist it (for example it may 
not fit in your cluster memory and executor may lose cached partition and 
require re-computation). For example EdgePartition's data remains the same, but 
RDD data can mutate (have new items).

The solution is not clear, maybe creating updatable EdgePartition or storing 
current snapshot somewhere (to HDFS for example) and read from there to create 
graph.

This code pretty much explains what I mean:
val edges = EdgeRDD.fromEdges[Int, Int](sparkContext.makeRDD(getEdges, 5).map { 
t =>
         val r = Random.nextInt()
         Edge(r * t.srcId, r * t.dstId, t.attr)
      })
      val graph = Graph.fromEdges(edges, -1, StorageLevel.NONE, 
StorageLevel.NONE)
      edges.persist()
      edges.count()
      edges.unpersist()
      graph.subgraph(epred = t => t.srcAttr != -1).triplets.collect()


was (Author: aram.mkrtchyan):
According to our investigation Spark stores mappings for every partition based 
on RDD data, it assumes that underlying data isn't being changed during 
computation. When you have an RDD which is returns different results every time 
you compute it (RDD source may be mutable), it may not match with it's mapping 
for previous state, this may happen even if you persist it (for example it may 
not fit in your cluster memory and executor may lose cached partition and 
require re-computation). For example EdgePartition's data remains the same, but 
RDD data can mutate (have new items).

The solution is not clear, maybe creating updatable EdgePartition or storing 
current snapshot somewhere (to HDFS for example) and read from there to create 
graph.

> GraphX pageRank: java.lang.ArrayIndexOutOfBoundsException: 
> -----------------------------------------------------------
>
>                 Key: SPARK-5480
>                 URL: https://issues.apache.org/jira/browse/SPARK-5480
>             Project: Spark
>          Issue Type: Bug
>          Components: GraphX
>    Affects Versions: 1.2.0, 1.3.1
>         Environment: Yarn client
>            Reporter: Stephane Maarek
>
> Running the following code:
>     val subgraph = graph.subgraph (
>       vpred = (id,article) => //working predicate)
>     ).cache()
>     println( s"Subgraph contains ${subgraph.vertices.count} nodes and 
> ${subgraph.edges.count} edges")
>     val prGraph = subgraph.staticPageRank(5).cache
>     val titleAndPrGraph = subgraph.outerJoinVertices(prGraph.vertices) {
>       (v, title, rank) => (rank.getOrElse(0.0), title)
>     }
>     titleAndPrGraph.vertices.top(13) {
>       Ordering.by((entry: (VertexId, (Double, _))) => entry._2._1)
>     }.foreach(t => println(t._2._2._1 + ": " + t._2._1 + ", id:" + t._1))
> Returns a graph with 5000 nodes and 4000 edges.
> Then it crashes during the PageRank with the following:
> 15/01/29 05:51:07 INFO scheduler.TaskSetManager: Starting task 125.0 in stage 
> 39.0 (TID 1808, *HIDDEN, PROCESS_LOCAL, 2059 bytes)
> 15/01/29 05:51:07 WARN scheduler.TaskSetManager: Lost task 107.0 in stage 
> 39.0 (TID 1794, *HIDDEN): java.lang.ArrayIndexOutOfBoundsException: -1
>         at 
> org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap$mcJI$sp.apply$mcJI$sp(GraphXPrimitiveKeyOpenHashMap.scala:64)
>         at 
> org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:91)
>         at 
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
>         at 
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at 
> org.apache.spark.graphx.impl.EdgeRDDImpl$$anonfun$mapEdgePartitions$1.apply(EdgeRDDImpl.scala:110)
>         at 
> org.apache.spark.graphx.impl.EdgeRDDImpl$$anonfun$mapEdgePartitions$1.apply(EdgeRDDImpl.scala:108)
>         at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
>         at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>         at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>         at 
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>         at 
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>         at 
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>         at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:744)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to