Julien MASSIOT created SPARK-19023:
--------------------------------------
Summary: Memory leak on GraphX with an iterative algorithm and
checkpoint on the graph
Key: SPARK-19023
URL: https://issues.apache.org/jira/browse/SPARK-19023
Project: Spark
Issue Type: Bug
Components: GraphX
Affects Versions: 2.0.2
Reporter: Julien MASSIOT
I am facing OOM whithin a spark streaming application with GraphX.
While trying to reproduce the issue on a simple application, I was able to
identify what appears to be 2 kind of memory leaks.
*Leak 1*
It can be reproduced with this simple scala application (that simulates more or
less what I'm doing in my spark streaming application, each iteration within
the loop simulating one micro-batch).
{code:title=TestGraph.scala|borderStyle=solid}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._
object TestGraph {
case class ImpactingEvent(entityInstance: String)
case class ImpactedNode(entityIsntance:String)
case class RelationInstance(relationType : String)
var impactingGraph : Graph[ImpactedNode, RelationInstance] = null;
def main(args: Array[String]) {
val conf = new
SparkConf().setAppName("TestImpactingPropagation").setMaster("local")
conf.set("spark.checkpoint.checkpointAllMarkedAncestors", "True")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val vertices: RDD[(VertexId, ImpactedNode)] = sc.parallelize(Array( (1L,
ImpactedNode("Node1")), (2L, ImpactedNode("Node2")), (3L,
ImpactedNode("Node3"))))
val edges: RDD[Edge[RelationInstance]] = sc.parallelize(Array( Edge(1L,
2L, RelationInstance("Required")), Edge(1L, 2L, RelationInstance("Failover"))))
impactingGraph = Graph(vertices, edges, null)
var x =0;
for(x <- 1 to 10){
impactingGraph = propagateEvent(impactingGraph,
ImpactingEvent("node1"), sc)
impactingGraph.checkpoint()
impactingGraph.edges.count()
impactingGraph.vertices.count()
}
println("Hello")
Thread.sleep(10000000)
}
private def propagateEvent(impactingGraph: Graph[ImpactedNode,
RelationInstance], event: ImpactingEvent, sc:SparkContext): Graph[ImpactedNode,
RelationInstance] = {
var graph = impactingGraph.mapVertices((id, node) => node ).cache
impactingGraph.unpersist(true)
graph.cache();
}
}
{code}
In this simple application, I am just applying a mapVertices transformation on
the graph and then I am doing a checkpoint on the graph. I am doing this
operation 10 times.
After this application finished the loop, I am taking an heapdump.
In this heapdump, I am able to see 11 "live" GraphImpl instances in memory.
My expectation is to have only 1 (the one referenced in the global variable
impactingGraph).
The "leak" is coming from the f function in a MapPartitionsRDD (which is
referenced by the partitionsRDD variable of my VertexRDD).
This f function contains an outer reference to the graph created in the
previous iteration.
I can see that in the clearDependencies function of MapPartitionsRDD, the f
function is not reset to null.
When looking to similar issues, I found this pull request:
[https://github.com/apache/spark/pull/3545]
In this pull request, the f variable is reset to null in the clearDependencies
method of the ZippedPartitionsRDD.
I do not understand why the same is not done within the MapPartitionsRDD.
I made a try by patching spark-core and by setting f to null in
clearDependencies of MapPartitionsRDD and it solved my leak on this simple use
case.
Don't you think the f variable has to be reset to null also in MapPartitionsRDD
?
*Leak 2*
Now, I'll do the same but in the propageEvent method in addition to the
mapVertices I am doing a joinVertices on the graph.
It can be found in the following application:
{code:title=TestGraph.scala|borderStyle=solid}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._
object TestGraph {
case class ImpactingEvent(entityInstance: String)
case class ImpactedNode(entityIsntance:String)
case class RelationInstance(relationType : String)
var impactingGraph : Graph[ImpactedNode, RelationInstance] = null;
def main(args: Array[String]) {
val conf = new
SparkConf().setAppName("TestImpactingPropagation").setMaster("local")
conf.set("spark.checkpoint.checkpointAllMarkedAncestors", "True")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val vertices: RDD[(VertexId, ImpactedNode)] = sc.parallelize(Array( (1L,
ImpactedNode("Node1")), (2L, ImpactedNode("Node2")), (3L,
ImpactedNode("Node3"))))
val edges: RDD[Edge[RelationInstance]] = sc.parallelize(Array( Edge(1L,
2L, RelationInstance("Required")), Edge(1L, 2L, RelationInstance("Failover"))))
impactingGraph = Graph(vertices, edges, null)
var x =0;
for(x <- 1 to 10){
impactingGraph = propagateEvent(impactingGraph,
ImpactingEvent("node1"), sc)
impactingGraph.checkpoint()
impactingGraph.edges.count()
impactingGraph.vertices.count()
}
println("Hello")
Thread.sleep(10000000)
}
private def propagateEvent(impactingGraph: Graph[ImpactedNode,
RelationInstance], event: ImpactingEvent, sc:SparkContext): Graph[ImpactedNode,
RelationInstance] = {
var graph = impactingGraph.mapVertices((id, node) => node ).cache
val verticesToJoin: RDD[(VertexId, String)] = sc.parallelize(Array( (1L,
"Node1"), (2L, "Node2")) )
graph = graph.joinVertices(verticesToJoin)({(id,src,toJoin)=>src})
impactingGraph.unpersist(true)
graph.cache();
}
}
{code}
When running this application and taking a memory dump, I can still see 11
"live" GraphImpl in memory (where I am expecting only 1) (even with the patch
described in the previous section).
When analyzing this dump, I can see that the "leak" is coming from a reference
to an array of partitions hold by the "partitions_" variable within the EdgeRDD
(this array of partitions contains a reference to the MapPartitionsRDD that
contains a reference to the graph created by the previous iteration similarly
to what is described in the *Leak 1* section)
This array of partitions is referenced 2 times:
* once in the "partitions_" variable of the partitionsRDD emebedded within the
EdgeRDD
* once in the "partitions_" variable of the EdgeRDD itself
This is coming from the getPartition method within the EdgeRDD
{code:title=EdgeRDD.scala|borderStyle=solid}
override protected def getPartitions: Array[Partition] =
partitionsRDD.partitions
{code}
After the checkpoint and count is called on graph edges, the reference to this
array is cleaned within the partitionsRDD of the EdgeRDD.
It is done through this call:
{code:title=RDD.scala|borderStyle=solid}
/**
* Changes the dependencies of this RDD from its original parents to a new
RDD (`newRDD`)
* created from the checkpoint file, and forget its old dependencies and
partitions.
*/
private[spark] def markCheckpointed(): Unit = {
clearDependencies()
partitions_ = null
deps = null // Forget the constructor argument for dependencies too
}
{code}
But this is not done for the "partitions_" variable of the EdgeRDD itself.
Indeed, the markCheckpointed() method is not called on the EdgeRDD itself but
only on the partitionsRDD embedded within the EdgeRDD.
Due to that, we still have a reference to this array of partitions (that
references a MapPartitionsRDD that references the graph of the previous
iteration).
I am able to solve this leak if I am calling the checkpoint and count on the
edges just after the mapVertices (and before the joinVertices) (and if the
patch described in the previous section is applied on MapPartitionsRDD).
But it doesn't seem clean to me.
In my mind:
* either the "partitions_" variable of the EdgeRDD should be reset to null
after a checkpoint is called on the Graph
* either the "partitions_" variable of the EdgeRDD should not reference the
same array of partitions as the one referenced by the "partitions_" variable of
the partitionsRDD. (don't know if this "partitions_" is really usefull on the
EdgeRDD)
What do you think?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]