[
https://issues.apache.org/jira/browse/SPARK-29878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17064297#comment-17064297
]
CacheCheck commented on SPARK-29878:
------------------------------------
And I also find that newEdges.cache() in GraphImpl.partitionBy(),
vertices.cache() in GraphImpl.subgraph(), vertices.cache() in
GraphImpl.aggregateMessagesWithActiveSet() have the same conditions. They are
unnecessary caches when I run TriangleCountingExample in example.graphx package.
> Improper cache strategies in GraphX
> -----------------------------------
>
> Key: SPARK-29878
> URL: https://issues.apache.org/jira/browse/SPARK-29878
> Project: Spark
> Issue Type: Improvement
> Components: GraphX
> Affects Versions: 3.0.0
> Reporter: CacheCheck
> Priority: Major
>
> I have run examples.graphx.SSPExample and looked through the RDD dependency
> graphs as well as persist operations. There are some improper cache
> strategies in GraphX. The same situations also exist when I run
> ConnectedComponentsExample.
> 1. vertices.cache() and newEdges.cache() are unnecessary
> In SSPExample, a graph is initialized by GraphImpl.mapVertices(). In this
> method, a GraphImpl object is created using GraphImpl.apply(vertices, edges),
> and RDD vertices/newEdges are cached in apply(). But these two RDDs are not
> directly used anymore (their children RDDs has been cached) in SSPExample, so
> the persists can be unnecessary here.
> However, the other examples may need these two persists, so I think they
> cannot be simply removed. It might be hard to fix.
> {code:scala}
> def apply[VD: ClassTag, ED: ClassTag](
> vertices: VertexRDD[VD],
> edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
> vertices.cache() // It is unnecessary for SSPExample and
> ConnectedComponentsExample
> // Convert the vertex partitions in edges to the correct type
> val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]]
> .mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
> .cache() // It is unnecessary for SSPExample and
> ConnectedComponentsExample
> GraphImpl.fromExistingRDDs(vertices, newEdges)
> }
> {code}
> 2. Missing persist on newEdges
> SSSPExample will invoke pregel to do execution. Pregel will ultilize
> ReplicatedVertexView.upgrade(). I find that RDD newEdges will be directly use
> by multiple actions in Pregel. So newEdges should be persisted.
> Same as the above issue, this issue is also found in
> ConnectedComponentsExample. It is also hard to fix, because the persist added
> may be unnecessary for other examples.
> {code:scala}
> // Pregel.scala
> // compute the messages
> var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg) //
> newEdges is created here
> val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)](
> checkpointInterval, graph.vertices.sparkContext)
> messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
> var activeMessages = messages.count() // The first time use newEdges
> ...
> while (activeMessages > 0 && i < maxIterations) {
> // Receive the messages and update the vertices.
> prevG = g
> g = g.joinVertices(messages)(vprog) // Generate g will depends on
> newEdges
> ...
> activeMessages = messages.count() // The second action to use newEdges.
> newEdges should be unpersisted after this instruction.
> {code}
> {code:scala}
> // ReplicatedVertexView.scala
> def upgrade(vertices: VertexRDD[VD], includeSrc: Boolean, includeDst:
> Boolean): Unit = {
> ...
> val newEdges =
> edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
> (ePartIter, shippedVertsIter) => ePartIter.map {
> case (pid, edgePartition) =>
> (pid,
> edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
> }
> })
> edges = newEdges // newEdges should be persisted
> hasSrcId = includeSrc
> hasDstId = includeDst
> }
> }
> {code}
> As I don't have much knowledge about Graphx, so I don't know how to fix these
> issues well.
> This issue is reported by our tool CacheCheck, which is used to dynamically
> detecting persist()/unpersist() api misuses.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]