[ 
https://issues.apache.org/jira/browse/SPARK-19023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-19023:
---------------------------------
    Labels: bulk-closed memory-leak  (was: memory-leak)

> Memory leak on GraphX with an iterative algorithm and checkpoint on the graph
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-19023
>                 URL: https://issues.apache.org/jira/browse/SPARK-19023
>             Project: Spark
>          Issue Type: Bug
>          Components: GraphX
>    Affects Versions: 2.0.2
>            Reporter: Julien MASSIOT
>            Priority: Major
>              Labels: bulk-closed, memory-leak
>
> I am facing OOM whithin a spark streaming application with GraphX.  
> While trying to reproduce the issue on a simple application, I was able to 
> identify what appears to be 2 kind of memory leaks.  
>   
> *Leak 1*
> It can be reproduced with this simple scala application (that simulates more 
> or less what I'm doing in my spark streaming application, each iteration 
> within the loop simulating one micro-batch).
> {code:title=TestGraph.scala|borderStyle=solid}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.graphx.Graph
> import org.apache.spark.rdd.RDD
> import org.apache.spark.graphx._
> object TestGraph {
>     case class ImpactingEvent(entityInstance: String)
>     case class ImpactedNode(entityIsntance:String)
>     case class RelationInstance(relationType : String)
>     var impactingGraph : Graph[ImpactedNode, RelationInstance] = null;
>     
>     def main(args: Array[String]) {
>       val conf = new 
> SparkConf().setAppName("TestImpactingPropagation").setMaster("local")
>       conf.set("spark.checkpoint.checkpointAllMarkedAncestors", "True")
>       val sc = new SparkContext(conf)
>       sc.setLogLevel("ERROR")
>      
>       val vertices: RDD[(VertexId, ImpactedNode)] = sc.parallelize(Array( 
> (1L, ImpactedNode("Node1")), (2L, ImpactedNode("Node2")), (3L, 
> ImpactedNode("Node3"))))
>       
>       val edges: RDD[Edge[RelationInstance]] =  sc.parallelize(Array( 
> Edge(1L, 2L, RelationInstance("Required")), Edge(1L, 2L, 
> RelationInstance("Failover"))))
>         
>       impactingGraph = Graph(vertices, edges, null)
>       
>       var x =0;
>       for(x <- 1 to 10){
>         impactingGraph = propagateEvent(impactingGraph, 
> ImpactingEvent("node1"), sc)
>         
>         impactingGraph.checkpoint()
>         impactingGraph.edges.count()
>         impactingGraph.vertices.count()
>       }
>       println("Hello")
>       Thread.sleep(10000000)
>     }
>     
>     private def propagateEvent(impactingGraph: Graph[ImpactedNode, 
> RelationInstance], event: ImpactingEvent, sc:SparkContext): 
> Graph[ImpactedNode, RelationInstance] = {
>       var graph = impactingGraph.mapVertices((id, node) => node ).cache
>       impactingGraph.unpersist(true)
>       graph.cache();
>     }
> }
> {code}
>   
> In this simple application, I am just applying a mapVertices transformation 
> on the graph and then I am doing a checkpoint on the graph. I am doing this 
> operation 10 times.   
> After this application finished the loop, I am taking an heapdump.  
>   
> In this heapdump, I am able to see 11 "live" GraphImpl instances in memory.  
> My expectation is to have only 1 (the one referenced in the global variable 
> impactingGraph).  
>   
> The "leak" is coming from the f function in a MapPartitionsRDD (which is 
> referenced by the partitionsRDD variable of my VertexRDD).
> This f function contains an outer reference to the graph created in the 
> previous iteration.
> I can see that in the clearDependencies function of MapPartitionsRDD, the f 
> function is not reset to null.
>   
> When looking to similar issues, I found this pull request:  
> [https://github.com/apache/spark/pull/3545]
> In this pull request, the f variable is reset to null in the 
> clearDependencies method of the ZippedPartitionsRDD.
> I do not understand why the same is not done within the MapPartitionsRDD.  
> I made a try by patching spark-core and by setting f to null in 
> clearDependencies of MapPartitionsRDD and it solved my leak on this simple 
> use case.
> Don't you think the f variable has to be reset to null also in 
> MapPartitionsRDD ?
> *Leak 2*
> Now, I'll do the same but in the propageEvent method in addition to the 
> mapVertices I am doing a joinVertices on the graph.
> It can be found in the following application:
> {code:title=TestGraph.scala|borderStyle=solid}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.graphx.Graph
> import org.apache.spark.rdd.RDD
> import org.apache.spark.graphx._
> object TestGraph {
>     case class ImpactingEvent(entityInstance: String)
>     case class ImpactedNode(entityIsntance:String)
>     case class RelationInstance(relationType : String)
>     var impactingGraph : Graph[ImpactedNode, RelationInstance] = null;
>     
>     def main(args: Array[String]) {
>       val conf = new 
> SparkConf().setAppName("TestImpactingPropagation").setMaster("local")
>       conf.set("spark.checkpoint.checkpointAllMarkedAncestors", "True")
>       val sc = new SparkContext(conf)
>       sc.setLogLevel("ERROR")
>      
>       val vertices: RDD[(VertexId, ImpactedNode)] = sc.parallelize(Array( 
> (1L, ImpactedNode("Node1")), (2L, ImpactedNode("Node2")), (3L, 
> ImpactedNode("Node3"))))
>       
>       val edges: RDD[Edge[RelationInstance]] =  sc.parallelize(Array( 
> Edge(1L, 2L, RelationInstance("Required")), Edge(1L, 2L, 
> RelationInstance("Failover"))))
>         
>       impactingGraph = Graph(vertices, edges, null)
>       
>       var x =0;
>       for(x <- 1 to 10){
>         impactingGraph = propagateEvent(impactingGraph, 
> ImpactingEvent("node1"), sc)
>         
>         impactingGraph.checkpoint()
>         impactingGraph.edges.count()
>         impactingGraph.vertices.count()
>       }
>       println("Hello")
>       Thread.sleep(10000000)
>     }
>     
>     private def propagateEvent(impactingGraph: Graph[ImpactedNode, 
> RelationInstance], event: ImpactingEvent, sc:SparkContext): 
> Graph[ImpactedNode, RelationInstance] = {
>       var graph = impactingGraph.mapVertices((id, node) => node ).cache
>       val verticesToJoin: RDD[(VertexId, String)] = sc.parallelize(Array( 
> (1L, "Node1"), (2L, "Node2")) )
>       graph = graph.joinVertices(verticesToJoin)({(id,src,toJoin)=>src})
>       impactingGraph.unpersist(true)
>       graph.cache();
>     }
> }
> {code}
> When running this application and taking a memory dump, I can still see 11 
> "live" GraphImpl in memory (where I am expecting only 1) (even with the patch 
> described in the previous section).
> When analyzing this dump, I can see that the "leak" is coming from a 
> reference to an array of partitions hold by the "partitions_" variable within 
> the EdgeRDD (this array of partitions contains a reference to the 
> MapPartitionsRDD that contains a reference to the graph created by the 
> previous iteration similarly to what is described in the *Leak 1* section)
> This array of partitions is referenced 2 times:
> * once in the "partitions_" variable of the partitionsRDD emebedded within 
> the EdgeRDD
> * once in the "partitions_" variable of the EdgeRDD itself
> This is coming from the getPartition method within the EdgeRDD
> {code:title=EdgeRDD.scala|borderStyle=solid}
>   override protected def getPartitions: Array[Partition] = 
> partitionsRDD.partitions
> {code}
> After the checkpoint and count is called on graph edges, the reference to 
> this array is cleaned within the partitionsRDD of the EdgeRDD.
> It is done through this call:
> {code:title=RDD.scala|borderStyle=solid}
>   /**
>    * Changes the dependencies of this RDD from its original parents to a new 
> RDD (`newRDD`)
>    * created from the checkpoint file, and forget its old dependencies and 
> partitions.
>    */
>   private[spark] def markCheckpointed(): Unit = {
>     clearDependencies()
>     partitions_ = null
>     deps = null    // Forget the constructor argument for dependencies too
>   }
> {code}
> But this is not done for the "partitions_" variable of the EdgeRDD itself.
> Indeed, the markCheckpointed() method is not called on the EdgeRDD itself but 
> only on the partitionsRDD embedded within the EdgeRDD.
> Due to that, we still have a reference to this array of partitions (that 
> references a MapPartitionsRDD that references the graph of the previous 
> iteration).
> I am able to solve this leak if I am calling the checkpoint and count on the 
> edges just after the mapVertices (and before the joinVertices) (and if the 
> patch described in the previous section is applied on MapPartitionsRDD).
> But it doesn't seem clean to me.
> In my mind:
> * either the "partitions_" variable of the EdgeRDD should be reset to null 
> after a checkpoint is called on the Graph
> * either the "partitions_" variable of the EdgeRDD should not reference the 
> same array of partitions as the one referenced by the "partitions_" variable 
> of the partitionsRDD. (don't know if this "partitions_" is really usefull on 
> the EdgeRDD)
> What do you think?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to