[ 
https://issues.apache.org/jira/browse/SPARK-29878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010317#comment-17010317
 ] 

Dong Wang commented on SPARK-29878:
-----------------------------------

So are these unnecessary caches tolerable?

These cached data is used only once in these cases, i.e., SSSPExample and 
ConectedComponentsExample, and I know that they're necessary cache for the most 
of other cases,

Is there a perfect way to handle all cases well?

> Improper cache strategies in GraphX
> -----------------------------------
>
>                 Key: SPARK-29878
>                 URL: https://issues.apache.org/jira/browse/SPARK-29878
>             Project: Spark
>          Issue Type: Improvement
>          Components: GraphX
>    Affects Versions: 3.0.0
>            Reporter: Dong Wang
>            Priority: Major
>
> I have run examples.graphx.SSPExample and looked through the RDD dependency 
> graphs as well as persist operations. There are some improper cache 
> strategies in GraphX. The same situations also exist when I run 
> ConnectedComponentsExample.
> 1.  vertices.cache() and newEdges.cache() are unnecessary
> In SSPExample, a graph is initialized by GraphImpl.mapVertices(). In this 
> method, a GraphImpl object is created using GraphImpl.apply(vertices, edges), 
> and RDD vertices/newEdges are cached in apply(). But these two RDDs are not 
> directly used anymore (their children RDDs has been cached) in SSPExample, so 
> the persists can be unnecessary here. 
> However, the other examples may need these two persists, so I think they 
> cannot be simply removed. It might be hard to fix.
> {code:scala}
>   def apply[VD: ClassTag, ED: ClassTag](
>       vertices: VertexRDD[VD],
>       edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
>     vertices.cache() // It is unnecessary for SSPExample and 
> ConnectedComponentsExample
>     // Convert the vertex partitions in edges to the correct type
>     val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]]
>       .mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
>       .cache() // It is unnecessary for SSPExample and 
> ConnectedComponentsExample
>     GraphImpl.fromExistingRDDs(vertices, newEdges)
>   }
> {code}
> 2. Missing persist on newEdges
> SSSPExample will invoke pregel to do execution. Pregel will ultilize 
> ReplicatedVertexView.upgrade(). I find that RDD newEdges will be directly use 
> by multiple actions in Pregel. So newEdges should be persisted.
> Same as the above issue, this issue is also found in 
> ConnectedComponentsExample. It is also hard to fix, because the persist added 
> may be unnecessary for other examples.
> {code:scala}
> // Pregel.scala
>     // compute the messages
>     var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg) // 
> newEdges is created here
>     val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)](
>       checkpointInterval, graph.vertices.sparkContext)
>     messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
>     var activeMessages = messages.count() // The first time use newEdges
>     ...
>     while (activeMessages > 0 && i < maxIterations) {
>       // Receive the messages and update the vertices.
>       prevG = g
>       g = g.joinVertices(messages)(vprog) // Generate g will depends on 
> newEdges
>       ...
>       activeMessages = messages.count() // The second action to use newEdges. 
> newEdges should be unpersisted after this instruction.
> {code}
> {code:scala}
> // ReplicatedVertexView.scala
>   def upgrade(vertices: VertexRDD[VD], includeSrc: Boolean, includeDst: 
> Boolean): Unit = {
>       ...
>        val newEdges = 
> edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
>         (ePartIter, shippedVertsIter) => ePartIter.map {
>           case (pid, edgePartition) =>
>             (pid, 
> edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
>         }
>       })
>       edges = newEdges // newEdges should be persisted
>       hasSrcId = includeSrc
>       hasDstId = includeDst
>     }
>   }
> {code}
> As I don't have much knowledge about Graphx, so I don't know how to fix these 
> issues well.
> This issue is reported by our tool CacheCheck, which is used to dynamically 
> detecting persist()/unpersist() api misuses.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to