Repository: spark
Updated Branches:
  refs/heads/branch-1.2 11446a648 -> 27d9f13af


[SPARK-3623][GraphX] GraphX should support the checkpoint operation

Author: GuoQiang Li <wi...@qq.com>

Closes #2631 from witgo/SPARK-3623 and squashes the following commits:

a70c500 [GuoQiang Li] Remove java related
4d1e249 [GuoQiang Li] Add comments
e682724 [GuoQiang Li] Graph should support the checkpoint operation

(cherry picked from commit e895e0cbecbbec1b412ff21321e57826d2d0a982)
Signed-off-by: Ankur Dave <ankurd...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27d9f13a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27d9f13a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27d9f13a

Branch: refs/heads/branch-1.2
Commit: 27d9f13af2df3bd7af029cf7ac48443ba6f4d6e0
Parents: 11446a6
Author: GuoQiang Li <wi...@qq.com>
Authored: Sat Dec 6 00:56:51 2014 -0800
Committer: Ankur Dave <ankurd...@gmail.com>
Committed: Sat Dec 6 00:57:02 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/graphx/Graph.scala   |  8 ++++++++
 .../apache/spark/graphx/impl/GraphImpl.scala    |  5 +++++
 .../org/apache/spark/graphx/GraphSuite.scala    | 21 ++++++++++++++++++++
 3 files changed, 34 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/27d9f13a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 6377915..23538b7 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -97,6 +97,14 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected 
() extends Serializab
   def cache(): Graph[VD, ED]
 
   /**
+   * Mark this Graph for checkpointing. It will be saved to a file inside the 
checkpoint
+   * directory set with SparkContext.setCheckpointDir() and all references to 
its parent
+   * RDDs will be removed. It is strongly recommended that this Graph is 
persisted in
+   * memory, otherwise saving it on a file will require recomputation.
+   */
+  def checkpoint(): Unit
+
+  /**
    * Uncaches only the vertices of this graph, leaving the edges alone. This 
is useful in iterative
    * algorithms that modify the vertex attributes but reuse the edges. This 
method can be used to
    * uncache the vertex attributes of previous iterations once they are no 
longer needed, improving

http://git-wip-us.apache.org/repos/asf/spark/blob/27d9f13a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 0eae2a6..a617d84 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -65,6 +65,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
     this
   }
 
+  override def checkpoint(): Unit = {
+    vertices.checkpoint()
+    replicatedVertexView.edges.checkpoint()
+  }
+
   override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = {
     vertices.unpersist(blocking)
     // TODO: unpersist the replicated vertices in `replicatedVertexView` but 
leave the edges alone

http://git-wip-us.apache.org/repos/asf/spark/blob/27d9f13a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index a05d1dd..9da0064 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.graphx
 
 import org.scalatest.FunSuite
 
+import com.google.common.io.Files
+
 import org.apache.spark.SparkContext
 import org.apache.spark.graphx.Graph._
 import org.apache.spark.graphx.PartitionStrategy._
@@ -365,4 +367,23 @@ class GraphSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test("checkpoint") {
+    val checkpointDir = Files.createTempDir()
+    checkpointDir.deleteOnExit()
+    withSpark { sc =>
+      sc.setCheckpointDir(checkpointDir.getAbsolutePath)
+      val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => 
Edge(a, b, 1)}
+      val rdd = sc.parallelize(ring)
+      val graph = Graph.fromEdges(rdd, 1.0F)
+      graph.checkpoint()
+      graph.edges.map(_.attr).count()
+      graph.vertices.map(_._2).count()
+
+      val edgesDependencies = graph.edges.partitionsRDD.dependencies
+      val verticesDependencies = graph.vertices.partitionsRDD.dependencies
+      assert(edgesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
+      assert(verticesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to