spark git commit: [SPARK-18845][GRAPHX] PageRank has incorrect initialization value that leads to slow convergence

2016-12-15 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master 172a52f5d -> 78062b852


[SPARK-18845][GRAPHX] PageRank has incorrect initialization value that leads to 
slow convergence

## What changes were proposed in this pull request?

Change the initial value in all PageRank implementations to be `1.0` instead of 
`resetProb` (default `0.15`) and use `outerJoinVertices` instead of 
`joinVertices` so that source vertices get updated in each iteration.

This seems to have been introduced a long time ago in 
https://github.com/apache/spark/commit/15a564598fe63003652b1e24527c432080b5976c#diff-b2bf3f97dcd2f19d61c921836159cda9L90

With the exception of graphs with sinks (which currently give incorrect results 
see SPARK-18847) this gives faster convergence as the sum of ranks is already 
correct (sum of ranks should be number of vertices).

Convergence comparision benchmark for small graph: http://imgur.com/a/HkkZf
Code for benchmark: 
https://gist.github.com/aray/a7de1f3801a810f8b1fa00c271a1fefd

## How was this patch tested?

(corrected) existing unit tests and additional test that verifies against 
result of igraph and NetworkX on a loop with a source.

Author: Andrew Ray 

Closes #16271 from aray/pagerank-initial-value.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78062b85
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78062b85
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78062b85

Branch: refs/heads/master
Commit: 78062b8521bb02900baeec31992d697fa677f122
Parents: 172a52f
Author: Andrew Ray 
Authored: Thu Dec 15 23:32:10 2016 -0800
Committer: Ankur Dave 
Committed: Thu Dec 15 23:32:10 2016 -0800

--
 .../org/apache/spark/graphx/lib/PageRank.scala  | 24 +++---
 .../apache/spark/graphx/lib/PageRankSuite.scala | 34 +---
 2 files changed, 42 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/78062b85/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index feb3f47..37b6e45 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -115,9 +115,9 @@ object PageRank extends Logging {
 val src: VertexId = srcId.getOrElse(-1L)
 
 // Initialize the PageRank graph with each edge attribute having
-// weight 1/outDegree and each vertex with attribute resetProb.
+// weight 1/outDegree and each vertex with attribute 1.0.
 // When running personalized pagerank, only the source vertex
-// has an attribute resetProb. All others are set to 0.
+// has an attribute 1.0. All others are set to 0.
 var rankGraph: Graph[Double, Double] = graph
   // Associate the degree with each vertex
   .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => 
deg.getOrElse(0) }
@@ -125,7 +125,7 @@ object PageRank extends Logging {
   .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src )
   // Set the vertex attributes to the initial pagerank values
   .mapVertices { (id, attr) =>
-if (!(id != src && personalized)) resetProb else 0.0
+if (!(id != src && personalized)) 1.0 else 0.0
   }
 
 def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 }
@@ -150,8 +150,8 @@ object PageRank extends Logging {
 (src: VertexId, id: VertexId) => resetProb
   }
 
-  rankGraph = rankGraph.joinVertices(rankUpdates) {
-(id, oldRank, msgSum) => rPrb(src, id) + (1.0 - resetProb) * msgSum
+  rankGraph = rankGraph.outerJoinVertices(rankUpdates) {
+(id, oldRank, msgSumOpt) => rPrb(src, id) + (1.0 - resetProb) * 
msgSumOpt.getOrElse(0.0)
   }.cache()
 
   rankGraph.edges.foreachPartition(x => {}) // also materializes 
rankGraph.vertices
@@ -196,7 +196,7 @@ object PageRank extends Logging {
 // we won't be able to store its activations in a sparse vector
 val zero = Vectors.sparse(sources.size, List()).asBreeze
 val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) =>
-  val v = Vectors.sparse(sources.size, Array(i), Array(resetProb)).asBreeze
+  val v = Vectors.sparse(sources.size, Array(i), Array(1.0)).asBreeze
   (vid, v)
 }.toMap
 val sc = graph.vertices.sparkContext
@@ -225,11 +225,11 @@ object PageRank extends Logging {
 ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr),
 (a : BV[Double], b : BV[Double]) => a :+ b, TripletFields.Src)
 
-  rankGraph = rankGraph.joinVertices(rankUpdates) {
-(vid, 

spark git commit: [SPARK-9436] [GRAPHX] Pregel simplification patch

2015-07-29 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master 5340dfaf9 - b715933fc


[SPARK-9436] [GRAPHX] Pregel simplification patch

Pregel code contains two consecutive joins:
```
g.vertices.innerJoin(messages)(vprog)
...
g = g.outerJoinVertices(newVerts)
{ (vid, old, newOpt) = newOpt.getOrElse(old) }
```
This can be simplified with one join. ankurdave proposed a patch based on our 
discussion in the mailing list: 
https://www.mail-archive.com/devspark.apache.org/msg10316.html

Author: Alexander Ulanov na...@yandex.ru

Closes #7749 from avulanov/SPARK-9436-pregel and squashes the following commits:

8568e06 [Alexander Ulanov] Pregel simplification patch


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b715933f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b715933f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b715933f

Branch: refs/heads/master
Commit: b715933fc69a49653abdb2fba0818dfc4f35d358
Parents: 5340dfa
Author: Alexander Ulanov na...@yandex.ru
Authored: Wed Jul 29 13:59:00 2015 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Wed Jul 29 13:59:00 2015 -0700

--
 .../scala/org/apache/spark/graphx/Pregel.scala  | 23 +---
 1 file changed, 10 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b715933f/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index cfcf724..2ca60d5 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -127,28 +127,25 @@ object Pregel extends Logging {
 var prevG: Graph[VD, ED] = null
 var i = 0
 while (activeMessages  0  i  maxIterations) {
-  // Receive the messages. Vertices that didn't get any messages do not 
appear in newVerts.
-  val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
-  // Update the graph with the new vertices.
+  // Receive the messages and update the vertices.
   prevG = g
-  g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) = 
newOpt.getOrElse(old) }
-  g.cache()
+  g = g.joinVertices(messages)(vprog).cache()
 
   val oldMessages = messages
-  // Send new messages. Vertices that didn't get any messages don't appear 
in newVerts, so don't
-  // get to send messages. We must cache messages so it can be 
materialized on the next line,
-  // allowing us to uncache the previous iteration.
-  messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, 
activeDirection))).cache()
-  // The call to count() materializes `messages`, `newVerts`, and the 
vertices of `g`. This
-  // hides oldMessages (depended on by newVerts), newVerts (depended on by 
messages), and the
-  // vertices of prevG (depended on by newVerts, oldMessages, and the 
vertices of g).
+  // Send new messages, skipping edges where neither side received a 
message. We must cache
+  // messages so it can be materialized on the next line, allowing us to 
uncache the previous
+  // iteration.
+  messages = g.mapReduceTriplets(
+sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
+  // The call to count() materializes `messages` and the vertices of `g`. 
This hides oldMessages
+  // (depended on by the vertices of g) and the vertices of prevG 
(depended on by oldMessages
+  // and the vertices of g).
   activeMessages = messages.count()
 
   logInfo(Pregel finished iteration  + i)
 
   // Unpersist the RDDs hidden by newly-materialized RDDs
   oldMessages.unpersist(blocking = false)
-  newVerts.unpersist(blocking = false)
   prevG.unpersistVertices(blocking = false)
   prevG.edges.unpersist(blocking = false)
   // count the iteration


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9109] [GRAPHX] Keep the cached edge in the graph

2015-07-17 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master eba6a1af4 - 587c315b2


[SPARK-9109] [GRAPHX] Keep the cached edge in the graph

The change here is to keep the cached RDDs in the graph object so that when the 
graph.unpersist() is called these RDDs are correctly unpersisted.

```java
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.slf4j.LoggerFactory
import org.apache.spark.graphx.util.GraphGenerators

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, (rxin, student)), (7L, (jgonzal, postdoc)),
   (5L, (franklin, prof)), (2L, (istoica, prof
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, collab),Edge(5L, 3L, advisor),
   Edge(2L, 5L, colleague), Edge(5L, 7L, pi)))
// Define a default user in case there are relationship with missing user
val defaultUser = (John Doe, Missing)
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
graph.cache().numEdges

graph.unpersist()

sc.getPersistentRDDs.foreach( r = println( r._2.toString))
```

Author: tien-dungle tien-dung...@realimpactanalytics.com

Closes #7469 from tien-dungle/SPARK-9109_Graphx-unpersist and squashes the 
following commits:

8d87997 [tien-dungle] Keep the cached edge in the graph


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/587c315b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/587c315b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/587c315b

Branch: refs/heads/master
Commit: 587c315b204f1439f696620543c38166d95f8a3d
Parents: eba6a1a
Author: tien-dungle tien-dung...@realimpactanalytics.com
Authored: Fri Jul 17 12:11:32 2015 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Fri Jul 17 12:11:32 2015 -0700

--
 .../main/scala/org/apache/spark/graphx/impl/GraphImpl.scala | 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/587c315b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 90a74d2..da95314 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -332,9 +332,9 @@ object GraphImpl {
   edgeStorageLevel: StorageLevel,
   vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
 val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD])
-  .withTargetStorageLevel(edgeStorageLevel).cache()
+  .withTargetStorageLevel(edgeStorageLevel)
 val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
-  .withTargetStorageLevel(vertexStorageLevel).cache()
+  .withTargetStorageLevel(vertexStorageLevel)
 GraphImpl(vertexRDD, edgeRDD)
   }
 
@@ -346,9 +346,14 @@ object GraphImpl {
   def apply[VD: ClassTag, ED: ClassTag](
   vertices: VertexRDD[VD],
   edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
+
+vertices.cache()
+
 // Convert the vertex partitions in edges to the correct type
 val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]]
   .mapEdgePartitions((pid, part) = part.withoutVertexAttributes[VD])
+  .cache()
+
 GraphImpl.fromExistingRDDs(vertices, newEdges)
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9109] [GRAPHX] Keep the cached edge in the graph

2015-07-17 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 bb1401507 - f34f3d71f


[SPARK-9109] [GRAPHX] Keep the cached edge in the graph

The change here is to keep the cached RDDs in the graph object so that when the 
graph.unpersist() is called these RDDs are correctly unpersisted.

```java
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.slf4j.LoggerFactory
import org.apache.spark.graphx.util.GraphGenerators

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, (rxin, student)), (7L, (jgonzal, postdoc)),
   (5L, (franklin, prof)), (2L, (istoica, prof
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, collab),Edge(5L, 3L, advisor),
   Edge(2L, 5L, colleague), Edge(5L, 7L, pi)))
// Define a default user in case there are relationship with missing user
val defaultUser = (John Doe, Missing)
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
graph.cache().numEdges

graph.unpersist()

sc.getPersistentRDDs.foreach( r = println( r._2.toString))
```

Author: tien-dungle tien-dung...@realimpactanalytics.com

Closes #7469 from tien-dungle/SPARK-9109_Graphx-unpersist and squashes the 
following commits:

8d87997 [tien-dungle] Keep the cached edge in the graph

(cherry picked from commit 587c315b204f1439f696620543c38166d95f8a3d)
Signed-off-by: Ankur Dave ankurd...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f34f3d71
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f34f3d71
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f34f3d71

Branch: refs/heads/branch-1.4
Commit: f34f3d71f6551da5e96b0de99c0f61fa981967f6
Parents: bb14015
Author: tien-dungle tien-dung...@realimpactanalytics.com
Authored: Fri Jul 17 12:11:32 2015 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Fri Jul 17 12:15:20 2015 -0700

--
 .../main/scala/org/apache/spark/graphx/impl/GraphImpl.scala | 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f34f3d71/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 90a74d2..da95314 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -332,9 +332,9 @@ object GraphImpl {
   edgeStorageLevel: StorageLevel,
   vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
 val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD])
-  .withTargetStorageLevel(edgeStorageLevel).cache()
+  .withTargetStorageLevel(edgeStorageLevel)
 val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
-  .withTargetStorageLevel(vertexStorageLevel).cache()
+  .withTargetStorageLevel(vertexStorageLevel)
 GraphImpl(vertexRDD, edgeRDD)
   }
 
@@ -346,9 +346,14 @@ object GraphImpl {
   def apply[VD: ClassTag, ED: ClassTag](
   vertices: VertexRDD[VD],
   edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
+
+vertices.cache()
+
 // Convert the vertex partitions in edges to the correct type
 val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]]
   .mapEdgePartitions((pid, part) = part.withoutVertexAttributes[VD])
+  .cache()
+
 GraphImpl.fromExistingRDDs(vertices, newEdges)
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-8718] [GRAPHX] Improve EdgePartition2D for non perfect square number of partitions

2015-07-14 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master d267c2834 - 0a4071eab


[SPARK-8718] [GRAPHX] Improve EdgePartition2D for non perfect square number of 
partitions

See https://github.com/aray/e2d/blob/master/EdgePartition2D.ipynb

Author: Andrew Ray ray.and...@gmail.com

Closes #7104 from aray/edge-partition-2d-improvement and squashes the following 
commits:

3729f84 [Andrew Ray] correct bounds and remove unneeded comments
97f8464 [Andrew Ray] change less
5141ab4 [Andrew Ray] Merge branch 'master' into edge-partition-2d-improvement
925fd2c [Andrew Ray] use new interface for partitioning
001bfd0 [Andrew Ray] Refactor PartitionStrategy so that we can return a 
prtition function for a given number of parts. To keep compatibility we define 
default methods that translate between the two implementation options. Made 
EdgePartition2D use old strategy when we have a perfect square and implement 
new interface.
5d42105 [Andrew Ray] % - /
3560084 [Andrew Ray] Merge branch 'master' into edge-partition-2d-improvement
f006364 [Andrew Ray] remove unneeded comments
cfa2c5e [Andrew Ray] Modifications to EdgePartition2D so that it works for non 
perfect squares.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a4071ea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a4071ea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a4071ea

Branch: refs/heads/master
Commit: 0a4071eab30db1db80f61ed2cb2e7243291183ce
Parents: d267c28
Author: Andrew Ray ray.and...@gmail.com
Authored: Tue Jul 14 13:14:47 2015 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Tue Jul 14 13:14:47 2015 -0700

--
 .../apache/spark/graphx/PartitionStrategy.scala | 32 +---
 1 file changed, 21 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0a4071ea/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index 7372dfb..70a7592 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -32,7 +32,7 @@ trait PartitionStrategy extends Serializable {
 object PartitionStrategy {
   /**
* Assigns edges to partitions using a 2D partitioning of the sparse edge 
adjacency matrix,
-   * guaranteeing a `2 * sqrt(numParts) - 1` bound on vertex replication.
+   * guaranteeing a `2 * sqrt(numParts)` bound on vertex replication.
*
* Suppose we have a graph with 12 vertices that we want to partition
* over 9 machines.  We can use the following sparse matrix representation:
@@ -61,26 +61,36 @@ object PartitionStrategy {
* that edges adjacent to `v11` can only be in the first column of blocks 
`(P0, P3,
* P6)` or the last
* row of blocks `(P6, P7, P8)`.  As a consequence we can guarantee that 
`v11` will need to be
-   * replicated to at most `2 * sqrt(numParts) - 1` machines.
+   * replicated to at most `2 * sqrt(numParts)` machines.
*
* Notice that `P0` has many edges and as a consequence this partitioning 
would lead to poor work
* balance.  To improve balance we first multiply each vertex id by a large 
prime to shuffle the
* vertex locations.
*
-   * One of the limitations of this approach is that the number of machines 
must either be a
-   * perfect square. We partially address this limitation by computing the 
machine assignment to
-   * the next
-   * largest perfect square and then mapping back down to the actual number of 
machines.
-   * Unfortunately, this can also lead to work imbalance and so it is 
suggested that a perfect
-   * square is used.
+   * When the number of partitions requested is not a perfect square we use a 
slightly different
+   * method where the last column can have a different number of rows than the 
others while still
+   * maintaining the same size per block.
*/
   case object EdgePartition2D extends PartitionStrategy {
 override def getPartition(src: VertexId, dst: VertexId, numParts: 
PartitionID): PartitionID = {
   val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
   val mixingPrime: VertexId = 1125899906842597L
-  val col: PartitionID = (math.abs(src * mixingPrime) % 
ceilSqrtNumParts).toInt
-  val row: PartitionID = (math.abs(dst * mixingPrime) % 
ceilSqrtNumParts).toInt
-  (col * ceilSqrtNumParts + row) % numParts
+  if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) {
+// Use old method for perfect squared to ensure we get same results
+val col: PartitionID = (math.abs(src * 

spark git commit: [SPARK-6736][GraphX][Doc]Example of Graph#aggregateMessages has error

2015-04-07 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master 6f0d55d76 - ae980eb41


[SPARK-6736][GraphX][Doc]Example of Graph#aggregateMessages has error

Example of Graph#aggregateMessages has error.
Since aggregateMessages is a method of Graph, It should be written 
rawGraph.aggregateMessages

Author: Sasaki Toru sasaki...@nttdata.co.jp

Closes #5388 from sasakitoa/aggregateMessagesExample and squashes the following 
commits:

b1d631b [Sasaki Toru] Example of Graph#aggregateMessages has error


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ae980eb4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ae980eb4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ae980eb4

Branch: refs/heads/master
Commit: ae980eb41c00b5f1f64c650f267b884e864693f0
Parents: 6f0d55d
Author: Sasaki Toru sasaki...@nttdata.co.jp
Authored: Tue Apr 7 01:55:32 2015 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Tue Apr 7 01:55:32 2015 -0700

--
 graphx/src/main/scala/org/apache/spark/graphx/Graph.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ae980eb4/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 8494d06..36dc7b0 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -409,7 +409,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected 
() extends Serializab
* {{{
* val rawGraph: Graph[_, _] = Graph.textFile(twittergraph)
* val inDeg: RDD[(VertexId, Int)] =
-   *   aggregateMessages[Int](ctx = ctx.sendToDst(1), _ + _)
+   *   rawGraph.aggregateMessages[Int](ctx = ctx.sendToDst(1), _ + _)
* }}}
*
* @note By expressing computation at the edge level we achieve


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-6510][GraphX]: Add Graph#minus method to act as Set#difference

2015-03-26 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master aad003227 - 39fb57968


[SPARK-6510][GraphX]: Add Graph#minus method to act as Set#difference

Adds a `Graph#minus` method which will return only unique `VertexId`'s from the 
calling `VertexRDD`.

To demonstrate a basic example with pseudocode:

```
Set((0L,0),(1L,1)).minus(Set((1L,1),(2L,2)))
 Set((0L,0))
```

Author: Brennon York brennon.y...@capitalone.com

Closes #5175 from brennonyork/SPARK-6510 and squashes the following commits:

248d5c8 [Brennon York] added minus(VertexRDD[VD]) method to avoid 
createUsingIndex and updated the mask operations to simplify with andNot call
3fb7cce [Brennon York] updated graphx doc to reflect the addition of minus 
method
6575d92 [Brennon York] updated mima exclude
aaa030b [Brennon York] completed graph#minus functionality
7227c0f [Brennon York] beginning work on minus functionality


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39fb5796
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39fb5796
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39fb5796

Branch: refs/heads/master
Commit: 39fb57968352549f2276ac4fcd2b92988ed6fe42
Parents: aad0032
Author: Brennon York brennon.y...@capitalone.com
Authored: Thu Mar 26 19:08:09 2015 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Thu Mar 26 19:08:09 2015 -0700

--
 docs/graphx-programming-guide.md|  2 ++
 .../org/apache/spark/graphx/VertexRDD.scala | 16 ++
 .../graphx/impl/VertexPartitionBaseOps.scala| 15 +
 .../spark/graphx/impl/VertexRDDImpl.scala   | 25 +++
 .../apache/spark/graphx/VertexRDDSuite.scala| 33 ++--
 project/MimaExcludes.scala  |  3 ++
 6 files changed, 92 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/39fb5796/docs/graphx-programming-guide.md
--
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index c601d79..3f10cb2 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -899,6 +899,8 @@ class VertexRDD[VD] extends RDD[(VertexID, VD)] {
   // Transform the values without changing the ids (preserves the internal 
index)
   def mapValues[VD2](map: VD = VD2): VertexRDD[VD2]
   def mapValues[VD2](map: (VertexId, VD) = VD2): VertexRDD[VD2]
+  // Show only vertices unique to this set based on their VertexId's
+  def minus(other: RDD[(VertexId, VD)])
   // Remove vertices from this set that appear in the other set
   def diff(other: VertexRDD[VD]): VertexRDD[VD]
   // Join operators that take advantage of the internal indexing to accelerate 
joins (substantially)

http://git-wip-us.apache.org/repos/asf/spark/blob/39fb5796/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index ad4bfe0..a9f04b5 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -122,6 +122,22 @@ abstract class VertexRDD[VD](
   def mapValues[VD2: ClassTag](f: (VertexId, VD) = VD2): VertexRDD[VD2]
 
   /**
+   * For each VertexId present in both `this` and `other`, minus will act as a 
set difference
+   * operation returning only those unique VertexId's present in `this`.
+   *
+   * @param other an RDD to run the set operation against
+   */
+  def minus(other: RDD[(VertexId, VD)]): VertexRDD[VD]
+
+  /**
+   * For each VertexId present in both `this` and `other`, minus will act as a 
set difference
+   * operation returning only those unique VertexId's present in `this`.
+   *
+   * @param other a VertexRDD to run the set operation against
+   */
+  def minus(other: VertexRDD[VD]): VertexRDD[VD]
+
+  /**
* For each vertex present in both `this` and `other`, `diff` returns only 
those vertices with
* differing values; for values that are different, keeps the values from 
`other`. This is
* only guaranteed to work if the VertexRDDs share a common ancestor.

http://git-wip-us.apache.org/repos/asf/spark/blob/39fb5796/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
index 4fd2548..b90f9fa 100644
--- 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
+++ 

spark git commit: [SPARK-5922][GraphX]: Add diff(other: RDD[VertexId, VD]) in VertexRDD

2015-03-16 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master aa6536fa3 - 45f4c6612


[SPARK-5922][GraphX]: Add diff(other: RDD[VertexId, VD]) in VertexRDD

Changed method invocation of 'diff' to match that of 'innerJoin' and 'leftJoin' 
from VertexRDD[VD] to RDD[(VertexId, VD)]. This change maintains backwards 
compatibility and better unifies the VertexRDD methods to match each other.

Author: Brennon York brennon.y...@capitalone.com

Closes #4733 from brennonyork/SPARK-5922 and squashes the following commits:

e800f08 [Brennon York] fixed merge conflicts
b9274af [Brennon York] fixed merge conflicts
f86375c [Brennon York] fixed minor include line
398ddb4 [Brennon York] fixed merge conflicts
aac1810 [Brennon York] updated to aggregateUsingIndex and added test to ensure 
that method works properly
2af0b88 [Brennon York] removed deprecation line
753c963 [Brennon York] fixed merge conflicts and set preference to use the 
diff(other: VertexRDD[VD]) method
2c678c6 [Brennon York] added mima exclude to exclude new public diff method 
from VertexRDD
93186f3 [Brennon York] added back the original diff method to sustain binary 
compatibility
f18356e [Brennon York] changed method invocation of 'diff' to match that of 
'innerJoin' and 'leftJoin' from VertexRDD[VD] to RDD[(VertexId, VD)]


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45f4c661
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45f4c661
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45f4c661

Branch: refs/heads/master
Commit: 45f4c66122c57011e74c694a424756812ab77d99
Parents: aa6536f
Author: Brennon York brennon.y...@capitalone.com
Authored: Mon Mar 16 01:06:26 2015 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Mon Mar 16 01:06:26 2015 -0700

--
 .../main/scala/org/apache/spark/graphx/VertexRDD.scala |  9 +
 .../org/apache/spark/graphx/impl/VertexRDDImpl.scala   |  4 
 .../scala/org/apache/spark/graphx/VertexRDDSuite.scala | 13 +
 project/MimaExcludes.scala |  3 +++
 4 files changed, 29 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/45f4c661/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index 40ecff7..ad4bfe0 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -126,6 +126,15 @@ abstract class VertexRDD[VD](
* differing values; for values that are different, keeps the values from 
`other`. This is
* only guaranteed to work if the VertexRDDs share a common ancestor.
*
+   * @param other the other RDD[(VertexId, VD)] with which to diff against.
+   */
+  def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD]
+
+  /**
+   * For each vertex present in both `this` and `other`, `diff` returns only 
those vertices with
+   * differing values; for values that are different, keeps the values from 
`other`. This is
+   * only guaranteed to work if the VertexRDDs share a common ancestor.
+   *
* @param other the other VertexRDD with which to diff against.
*/
   def diff(other: VertexRDD[VD]): VertexRDD[VD]

http://git-wip-us.apache.org/repos/asf/spark/blob/45f4c661/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 904be21..125692d 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -103,6 +103,10 @@ class VertexRDDImpl[VD] private[graphx] (
   override def mapValues[VD2: ClassTag](f: (VertexId, VD) = VD2): 
VertexRDD[VD2] =
 this.mapVertexPartitions(_.map(f))
 
+  override def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD] = {
+diff(this.aggregateUsingIndex(other, (a: VD, b: VD) = a))
+  }
+
   override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
 val otherPartition = other match {
   case other: VertexRDD[_] if this.partitioner == other.partitioner =

http://git-wip-us.apache.org/repos/asf/spark/blob/45f4c661/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
--
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index 97533dd..4f7a442 100644
--- 

spark git commit: [SPARK-1955][GraphX]: VertexRDD can incorrectly assume index sharing

2015-02-25 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 eaffc6edd - 8073767f5


[SPARK-1955][GraphX]: VertexRDD can incorrectly assume index sharing

Fixes the issue whereby when VertexRDD's are `diff`ed, `innerJoin`ed, or 
`leftJoin`ed and have different partition sizes they fail under the 
`zipPartitions` method. This fix tests whether the partitions are equal or not 
and, if not, will repartition the other to match the partition size of the 
calling VertexRDD.

Author: Brennon York brennon.y...@capitalone.com

Closes #4705 from brennonyork/SPARK-1955 and squashes the following commits:

0882590 [Brennon York] updated to properly handle differently-partitioned 
vertexRDDs

(cherry picked from commit 9f603fce78fcc997926e9a72dec44d48cbc396fc)
Signed-off-by: Ankur Dave ankurd...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8073767f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8073767f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8073767f

Branch: refs/heads/branch-1.3
Commit: 8073767f5144b84de4c019c3d07b33a6454a656e
Parents: eaffc6e
Author: Brennon York brennon.y...@capitalone.com
Authored: Wed Feb 25 14:11:12 2015 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Wed Feb 25 14:11:29 2015 -0800

--
 .../org/apache/spark/graphx/impl/VertexRDDImpl.scala| 12 +---
 1 file changed, 9 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8073767f/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 6dad167..904be21 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -104,8 +104,14 @@ class VertexRDDImpl[VD] private[graphx] (
 this.mapVertexPartitions(_.map(f))
 
   override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
+val otherPartition = other match {
+  case other: VertexRDD[_] if this.partitioner == other.partitioner =
+other.partitionsRDD
+  case _ =
+VertexRDD(other.partitionBy(this.partitioner.get)).partitionsRDD
+}
 val newPartitionsRDD = partitionsRDD.zipPartitions(
-  other.partitionsRDD, preservesPartitioning = true
+  otherPartition, preservesPartitioning = true
 ) { (thisIter, otherIter) =
   val thisPart = thisIter.next()
   val otherPart = otherIter.next()
@@ -133,7 +139,7 @@ class VertexRDDImpl[VD] private[graphx] (
 // Test if the other vertex is a VertexRDD to choose the optimal join 
strategy.
 // If the other set is a VertexRDD then we use the much more efficient 
leftZipJoin
 other match {
-  case other: VertexRDD[_] =
+  case other: VertexRDD[_] if this.partitioner == other.partitioner =
 leftZipJoin(other)(f)
   case _ =
 this.withPartitionsRDD[VD3](
@@ -162,7 +168,7 @@ class VertexRDDImpl[VD] private[graphx] (
 // Test if the other vertex is a VertexRDD to choose the optimal join 
strategy.
 // If the other set is a VertexRDD then we use the much more efficient 
innerZipJoin
 other match {
-  case other: VertexRDD[_] =
+  case other: VertexRDD[_] if this.partitioner == other.partitioner =
 innerZipJoin(other)(f)
   case _ =
 this.withPartitionsRDD(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-1955][GraphX]: VertexRDD can incorrectly assume index sharing

2015-02-25 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master a777c65da - 9f603fce7


[SPARK-1955][GraphX]: VertexRDD can incorrectly assume index sharing

Fixes the issue whereby when VertexRDD's are `diff`ed, `innerJoin`ed, or 
`leftJoin`ed and have different partition sizes they fail under the 
`zipPartitions` method. This fix tests whether the partitions are equal or not 
and, if not, will repartition the other to match the partition size of the 
calling VertexRDD.

Author: Brennon York brennon.y...@capitalone.com

Closes #4705 from brennonyork/SPARK-1955 and squashes the following commits:

0882590 [Brennon York] updated to properly handle differently-partitioned 
vertexRDDs


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f603fce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f603fce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f603fce

Branch: refs/heads/master
Commit: 9f603fce78fcc997926e9a72dec44d48cbc396fc
Parents: a777c65
Author: Brennon York brennon.y...@capitalone.com
Authored: Wed Feb 25 14:11:12 2015 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Wed Feb 25 14:11:12 2015 -0800

--
 .../org/apache/spark/graphx/impl/VertexRDDImpl.scala| 12 +---
 1 file changed, 9 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9f603fce/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 6dad167..904be21 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -104,8 +104,14 @@ class VertexRDDImpl[VD] private[graphx] (
 this.mapVertexPartitions(_.map(f))
 
   override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
+val otherPartition = other match {
+  case other: VertexRDD[_] if this.partitioner == other.partitioner =
+other.partitionsRDD
+  case _ =
+VertexRDD(other.partitionBy(this.partitioner.get)).partitionsRDD
+}
 val newPartitionsRDD = partitionsRDD.zipPartitions(
-  other.partitionsRDD, preservesPartitioning = true
+  otherPartition, preservesPartitioning = true
 ) { (thisIter, otherIter) =
   val thisPart = thisIter.next()
   val otherPart = otherIter.next()
@@ -133,7 +139,7 @@ class VertexRDDImpl[VD] private[graphx] (
 // Test if the other vertex is a VertexRDD to choose the optimal join 
strategy.
 // If the other set is a VertexRDD then we use the much more efficient 
leftZipJoin
 other match {
-  case other: VertexRDD[_] =
+  case other: VertexRDD[_] if this.partitioner == other.partitioner =
 leftZipJoin(other)(f)
   case _ =
 this.withPartitionsRDD[VD3](
@@ -162,7 +168,7 @@ class VertexRDDImpl[VD] private[graphx] (
 // Test if the other vertex is a VertexRDD to choose the optimal join 
strategy.
 // If the other set is a VertexRDD then we use the much more efficient 
innerZipJoin
 other match {
-  case other: VertexRDD[_] =
+  case other: VertexRDD[_] if this.partitioner == other.partitioner =
 innerZipJoin(other)(f)
   case _ =
 this.withPartitionsRDD(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-1955][GraphX]: VertexRDD can incorrectly assume index sharing

2015-02-25 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 a9abcaa2c - 00112baf9


[SPARK-1955][GraphX]: VertexRDD can incorrectly assume index sharing

Fixes the issue whereby when VertexRDD's are `diff`ed, `innerJoin`ed, or 
`leftJoin`ed and have different partition sizes they fail under the 
`zipPartitions` method. This fix tests whether the partitions are equal or not 
and, if not, will repartition the other to match the partition size of the 
calling VertexRDD.

Author: Brennon York brennon.y...@capitalone.com

Closes #4705 from brennonyork/SPARK-1955 and squashes the following commits:

0882590 [Brennon York] updated to properly handle differently-partitioned 
vertexRDDs

(cherry picked from commit 9f603fce78fcc997926e9a72dec44d48cbc396fc)
Signed-off-by: Ankur Dave ankurd...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00112baf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00112baf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00112baf

Branch: refs/heads/branch-1.2
Commit: 00112baf9e9707e9a773e8076dc4ed2957803bfd
Parents: a9abcaa
Author: Brennon York brennon.y...@capitalone.com
Authored: Wed Feb 25 14:11:12 2015 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Wed Feb 25 14:14:22 2015 -0800

--
 .../org/apache/spark/graphx/impl/VertexRDDImpl.scala| 12 +---
 1 file changed, 9 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/00112baf/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 9732c5b..d9bf9fe 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -94,8 +94,14 @@ class VertexRDDImpl[VD] private[graphx] (
 this.mapVertexPartitions(_.map(f))
 
   override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
+val otherPartition = other match {
+  case other: VertexRDD[_] if this.partitioner == other.partitioner =
+other.partitionsRDD
+  case _ =
+VertexRDD(other.partitionBy(this.partitioner.get)).partitionsRDD
+}
 val newPartitionsRDD = partitionsRDD.zipPartitions(
-  other.partitionsRDD, preservesPartitioning = true
+  otherPartition, preservesPartitioning = true
 ) { (thisIter, otherIter) =
   val thisPart = thisIter.next()
   val otherPart = otherIter.next()
@@ -123,7 +129,7 @@ class VertexRDDImpl[VD] private[graphx] (
 // Test if the other vertex is a VertexRDD to choose the optimal join 
strategy.
 // If the other set is a VertexRDD then we use the much more efficient 
leftZipJoin
 other match {
-  case other: VertexRDD[_] =
+  case other: VertexRDD[_] if this.partitioner == other.partitioner =
 leftZipJoin(other)(f)
   case _ =
 this.withPartitionsRDD[VD3](
@@ -152,7 +158,7 @@ class VertexRDDImpl[VD] private[graphx] (
 // Test if the other vertex is a VertexRDD to choose the optimal join 
strategy.
 // If the other set is a VertexRDD then we use the much more efficient 
innerZipJoin
 other match {
-  case other: VertexRDD[_] =
+  case other: VertexRDD[_] if this.partitioner == other.partitioner =
 innerZipJoin(other)(f)
   case _ =
 this.withPartitionsRDD(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: SPARK-3290 [GRAPHX] No unpersist callls in SVDPlusPlus

2015-02-14 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 152147f5f - db5747921


SPARK-3290 [GRAPHX] No unpersist callls in SVDPlusPlus

This just unpersist()s each RDD in this code that was cache()ed.

Author: Sean Owen so...@cloudera.com

Closes #4234 from srowen/SPARK-3290 and squashes the following commits:

66c1e11 [Sean Owen] unpersist() each RDD that was cache()ed

(cherry picked from commit 0ce4e430a81532dc317136f968f28742e087d840)
Signed-off-by: Ankur Dave ankurd...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/db574792
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/db574792
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/db574792

Branch: refs/heads/branch-1.3
Commit: db5747921a648c3f7cf1de6dba70b82584afd097
Parents: 152147f
Author: Sean Owen so...@cloudera.com
Authored: Fri Feb 13 20:12:52 2015 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Fri Feb 13 20:14:49 2015 -0800

--
 .../apache/spark/graphx/lib/SVDPlusPlus.scala   | 40 
 1 file changed, 32 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/db574792/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
index f58587e..112ed09 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
@@ -72,17 +72,22 @@ object SVDPlusPlus {
 
 // construct graph
 var g = Graph.fromEdges(edges, defaultF(conf.rank)).cache()
+materialize(g)
+edges.unpersist()
 
 // Calculate initial bias and norm
 val t0 = g.aggregateMessages[(Long, Double)](
   ctx = { ctx.sendToSrc((1L, ctx.attr)); ctx.sendToDst((1L, ctx.attr)) },
   (g1, g2) = (g1._1 + g2._1, g1._2 + g2._2))
 
-g = g.outerJoinVertices(t0) {
+val gJoinT0 = g.outerJoinVertices(t0) {
   (vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double),
msg: Option[(Long, Double)]) =
 (vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / 
scala.math.sqrt(msg.get._1))
-}
+}.cache()
+materialize(gJoinT0)
+g.unpersist()
+g = gJoinT0
 
 def sendMsgTrainF(conf: Conf, u: Double)
 (ctx: EdgeContext[
@@ -114,12 +119,15 @@ object SVDPlusPlus {
   val t1 = g.aggregateMessages[DoubleMatrix](
 ctx = ctx.sendToSrc(ctx.dstAttr._2),
 (g1, g2) = g1.addColumnVector(g2))
-  g = g.outerJoinVertices(t1) {
+  val gJoinT1 = g.outerJoinVertices(t1) {
 (vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double),
  msg: Option[DoubleMatrix]) =
   if (msg.isDefined) (vd._1, vd._1
 .addColumnVector(msg.get.mul(vd._4)), vd._3, vd._4) else vd
-  }
+  }.cache()
+  materialize(gJoinT1)
+  g.unpersist()
+  g = gJoinT1
 
   // Phase 2, update p for user nodes and q, y for item nodes
   g.cache()
@@ -127,13 +135,16 @@ object SVDPlusPlus {
 sendMsgTrainF(conf, u),
 (g1: (DoubleMatrix, DoubleMatrix, Double), g2: (DoubleMatrix, 
DoubleMatrix, Double)) =
   (g1._1.addColumnVector(g2._1), g1._2.addColumnVector(g2._2), g1._3 + 
g2._3))
-  g = g.outerJoinVertices(t2) {
+  val gJoinT2 = g.outerJoinVertices(t2) {
 (vid: VertexId,
  vd: (DoubleMatrix, DoubleMatrix, Double, Double),
  msg: Option[(DoubleMatrix, DoubleMatrix, Double)]) =
   (vd._1.addColumnVector(msg.get._1), 
vd._2.addColumnVector(msg.get._2),
 vd._3 + msg.get._3, vd._4)
-  }
+  }.cache()
+  materialize(gJoinT2)
+  g.unpersist()
+  g = gJoinT2
 }
 
 // calculate error on training set
@@ -147,13 +158,26 @@ object SVDPlusPlus {
   val err = (ctx.attr - pred) * (ctx.attr - pred)
   ctx.sendToDst(err)
 }
+
 g.cache()
 val t3 = g.aggregateMessages[Double](sendMsgTestF(conf, u), _ + _)
-g = g.outerJoinVertices(t3) {
+val gJoinT3 = g.outerJoinVertices(t3) {
   (vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double), msg: 
Option[Double]) =
 if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd
-}
+}.cache()
+materialize(gJoinT3)
+g.unpersist()
+g = gJoinT3
 
 (g, u)
   }
+
+  /**
+   * Forces materialization of a Graph by count()ing its RDDs.
+   */
+  private def materialize(g: Graph[_,_]): Unit = {
+g.vertices.count()
+g.edges.count()
+  }
+
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, 

spark git commit: [SPARK-5343][GraphX]: ShortestPaths traverses backwards

2015-02-10 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 bba095399 - 5be8902f7


[SPARK-5343][GraphX]: ShortestPaths traverses backwards

Corrected the logic with ShortestPaths so that the calculation will run forward 
rather than backwards. Output before looked like:

```scala
import org.apache.spark.graphx._
val g = Graph(sc.makeRDD(Array((1L,), (2L,), (3L,))), 
sc.makeRDD(Array(Edge(1L,2L,), Edge(2L,3L,
lib.ShortestPaths.run(g,Array(3)).vertices.collect
// res0: Array[(org.apache.spark.graphx.VertexId, 
org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map()), (3,Map(3 
- 0)), (2,Map()))
lib.ShortestPaths.run(g,Array(1)).vertices.collect
// res1: Array[(org.apache.spark.graphx.VertexId, 
org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map(1 - 0)), 
(3,Map(1 - 2)), (2,Map(1 - 1)))
```

And new output after the changes looks like:

```scala
import org.apache.spark.graphx._
val g = Graph(sc.makeRDD(Array((1L,), (2L,), (3L,))), 
sc.makeRDD(Array(Edge(1L,2L,), Edge(2L,3L,
lib.ShortestPaths.run(g,Array(3)).vertices.collect
// res0: Array[(org.apache.spark.graphx.VertexId, 
org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map(3 - 2)), 
(2,Map(3 - 1)), (3,Map(3 - 0)))
lib.ShortestPaths.run(g,Array(1)).vertices.collect
// res1: Array[(org.apache.spark.graphx.VertexId, 
org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map(1 - 0)), 
(2,Map()), (3,Map()))
```

Author: Brennon York brennon.y...@capitalone.com

Closes #4478 from brennonyork/SPARK-5343 and squashes the following commits:

aa57f83 [Brennon York] updated to set ShortestPaths to run 'forward' rather 
than 'backward'

(cherry picked from commit 5820961289eb98e45eb467efa316c7592b8d619c)
Signed-off-by: Ankur Dave ankurd...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5be8902f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5be8902f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5be8902f

Branch: refs/heads/branch-1.3
Commit: 5be8902f7a7f6eafbe75a9b2cf9b0cc3e5bc6f2b
Parents: bba0953
Author: Brennon York brennon.y...@capitalone.com
Authored: Tue Feb 10 14:57:00 2015 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Tue Feb 10 14:57:18 2015 -0800

--
 .../main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5be8902f/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
index 590f047..179f284 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
@@ -61,8 +61,8 @@ object ShortestPaths {
 }
 
 def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] 
= {
-  val newAttr = incrementMap(edge.srcAttr)
-  if (edge.dstAttr != addMaps(newAttr, edge.dstAttr)) 
Iterator((edge.dstId, newAttr))
+  val newAttr = incrementMap(edge.dstAttr)
+  if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) 
Iterator((edge.srcId, newAttr))
   else Iterator.empty
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5343][GraphX]: ShortestPaths traverses backwards

2015-02-10 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master fd2c032f9 - 582096128


[SPARK-5343][GraphX]: ShortestPaths traverses backwards

Corrected the logic with ShortestPaths so that the calculation will run forward 
rather than backwards. Output before looked like:

```scala
import org.apache.spark.graphx._
val g = Graph(sc.makeRDD(Array((1L,), (2L,), (3L,))), 
sc.makeRDD(Array(Edge(1L,2L,), Edge(2L,3L,
lib.ShortestPaths.run(g,Array(3)).vertices.collect
// res0: Array[(org.apache.spark.graphx.VertexId, 
org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map()), (3,Map(3 
- 0)), (2,Map()))
lib.ShortestPaths.run(g,Array(1)).vertices.collect
// res1: Array[(org.apache.spark.graphx.VertexId, 
org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map(1 - 0)), 
(3,Map(1 - 2)), (2,Map(1 - 1)))
```

And new output after the changes looks like:

```scala
import org.apache.spark.graphx._
val g = Graph(sc.makeRDD(Array((1L,), (2L,), (3L,))), 
sc.makeRDD(Array(Edge(1L,2L,), Edge(2L,3L,
lib.ShortestPaths.run(g,Array(3)).vertices.collect
// res0: Array[(org.apache.spark.graphx.VertexId, 
org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map(3 - 2)), 
(2,Map(3 - 1)), (3,Map(3 - 0)))
lib.ShortestPaths.run(g,Array(1)).vertices.collect
// res1: Array[(org.apache.spark.graphx.VertexId, 
org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map(1 - 0)), 
(2,Map()), (3,Map()))
```

Author: Brennon York brennon.y...@capitalone.com

Closes #4478 from brennonyork/SPARK-5343 and squashes the following commits:

aa57f83 [Brennon York] updated to set ShortestPaths to run 'forward' rather 
than 'backward'


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/58209612
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/58209612
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/58209612

Branch: refs/heads/master
Commit: 5820961289eb98e45eb467efa316c7592b8d619c
Parents: fd2c032
Author: Brennon York brennon.y...@capitalone.com
Authored: Tue Feb 10 14:57:00 2015 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Tue Feb 10 14:57:00 2015 -0800

--
 .../main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/58209612/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
index 590f047..179f284 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
@@ -61,8 +61,8 @@ object ShortestPaths {
 }
 
 def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] 
= {
-  val newAttr = incrementMap(edge.srcAttr)
-  if (edge.dstAttr != addMaps(newAttr, edge.dstAttr)) 
Iterator((edge.dstId, newAttr))
+  val newAttr = incrementMap(edge.dstAttr)
+  if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) 
Iterator((edge.srcId, newAttr))
   else Iterator.empty
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5351][GraphX] Do not use Partitioner.defaultPartitioner as a partitioner of EdgeRDDImp...

2015-01-23 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master cef1f092a - e224dbb01


[SPARK-5351][GraphX] Do not use Partitioner.defaultPartitioner as a partitioner 
of EdgeRDDImp...

If the value of 'spark.default.parallelism' does not match the number of 
partitoins in EdgePartition(EdgeRDDImpl),
the following error occurs in ReplicatedVertexView.scala:72;

object GraphTest extends Logging {
  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): VertexRDD[Int] = {
graph.aggregateMessages(
  ctx = {
ctx.sendToSrc(1)
ctx.sendToDst(2)
  },
  _ + _)
  }
}

val g = GraphLoader.edgeListFile(sc, graph.txt)
val rdd = GraphTest.run(g)

java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of 
partitions
at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:204)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:204)
at org.apache.spark.ShuffleDependency.init(Dependency.scala:82)
at 
org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:193)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:191)
...

Author: Takeshi Yamamuro linguin@gmail.com

Closes #4136 from maropu/EdgePartitionBugFix and squashes the following commits:

0cd8942 [Ankur Dave] Use more concise getOrElse
aad4a2c [Ankur Dave] Add unit test for non-default number of edge partitions
0a2f32b [Takeshi Yamamuro] Do not use Partitioner.defaultPartitioner as a 
partitioner of EdgeRDDImpl


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e224dbb0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e224dbb0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e224dbb0

Branch: refs/heads/master
Commit: e224dbb011789297cd6c6ba095f702c042869ed6
Parents: cef1f09
Author: Takeshi Yamamuro linguin@gmail.com
Authored: Fri Jan 23 19:25:15 2015 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Fri Jan 23 19:26:39 2015 -0800

--
 .../apache/spark/graphx/impl/EdgeRDDImpl.scala  |  4 ++--
 .../org/apache/spark/graphx/GraphSuite.scala| 20 
 2 files changed, 22 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e224dbb0/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
index 897c7ee..f1550ac 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -19,7 +19,7 @@ package org.apache.spark.graphx.impl
 
 import scala.reflect.{classTag, ClassTag}
 
-import org.apache.spark.{OneToOneDependency, Partition, Partitioner, 
TaskContext}
+import org.apache.spark.{OneToOneDependency, HashPartitioner, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
@@ -46,7 +46,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] 
(
* partitioner that allows co-partitioning with `partitionsRDD`.
*/
   override val partitioner =
-
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
+partitionsRDD.partitioner.orElse(Some(new 
HashPartitioner(partitions.size)))
 
   override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e224dbb0/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
--
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 9da0064..ed9876b 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -386,4 +386,24 @@ class GraphSuite extends FunSuite with LocalSparkContext {
 }
   }
 
+  test(non-default number of edge partitions) {
+  

spark git commit: [SPARK-5351][GraphX] Do not use Partitioner.defaultPartitioner as a partitioner of EdgeRDDImp...

2015-01-23 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 2ea782a9d - 73cb806f7


[SPARK-5351][GraphX] Do not use Partitioner.defaultPartitioner as a partitioner 
of EdgeRDDImp...

If the value of 'spark.default.parallelism' does not match the number of 
partitoins in EdgePartition(EdgeRDDImpl),
the following error occurs in ReplicatedVertexView.scala:72;

object GraphTest extends Logging {
  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): VertexRDD[Int] = {
graph.aggregateMessages(
  ctx = {
ctx.sendToSrc(1)
ctx.sendToDst(2)
  },
  _ + _)
  }
}

val g = GraphLoader.edgeListFile(sc, graph.txt)
val rdd = GraphTest.run(g)

java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of 
partitions
at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:204)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:204)
at org.apache.spark.ShuffleDependency.init(Dependency.scala:82)
at 
org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:193)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:191)
...

Author: Takeshi Yamamuro linguin@gmail.com

Closes #4136 from maropu/EdgePartitionBugFix and squashes the following commits:

0cd8942 [Ankur Dave] Use more concise getOrElse
aad4a2c [Ankur Dave] Add unit test for non-default number of edge partitions
0a2f32b [Takeshi Yamamuro] Do not use Partitioner.defaultPartitioner as a 
partitioner of EdgeRDDImpl

(cherry picked from commit e224dbb011789297cd6c6ba095f702c042869ed6)
Signed-off-by: Ankur Dave ankurd...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73cb806f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73cb806f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73cb806f

Branch: refs/heads/branch-1.2
Commit: 73cb806f71fbc44ce2488254db177f6500fe83c7
Parents: 2ea782a
Author: Takeshi Yamamuro linguin@gmail.com
Authored: Fri Jan 23 19:25:15 2015 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Fri Jan 23 19:29:42 2015 -0800

--
 .../apache/spark/graphx/impl/EdgeRDDImpl.scala  |  4 ++--
 .../org/apache/spark/graphx/GraphSuite.scala| 20 
 2 files changed, 22 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/73cb806f/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
index 897c7ee..f1550ac 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -19,7 +19,7 @@ package org.apache.spark.graphx.impl
 
 import scala.reflect.{classTag, ClassTag}
 
-import org.apache.spark.{OneToOneDependency, Partition, Partitioner, 
TaskContext}
+import org.apache.spark.{OneToOneDependency, HashPartitioner, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
@@ -46,7 +46,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] 
(
* partitioner that allows co-partitioning with `partitionsRDD`.
*/
   override val partitioner =
-
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
+partitionsRDD.partitioner.orElse(Some(new 
HashPartitioner(partitions.size)))
 
   override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/73cb806f/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
--
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 9da0064..ed9876b 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -386,4 +386,24 @@ 

spark git commit: [SPARK-5064][GraphX] Add numEdges upperbound validation for R-MAT graph generator to prevent infinite loop

2015-01-21 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 e90f6b5c6 - 37db20c94


[SPARK-5064][GraphX] Add numEdges upperbound validation for R-MAT graph 
generator to prevent infinite loop

I looked into GraphGenerators#chooseCell, and found that chooseCell can't 
generate more edges than pow(2, (2 * (log2(numVertices)-1))) to make a 
Power-law graph. (Ex. numVertices:4 upperbound:4, numVertices:8 upperbound:16, 
numVertices:16 upperbound:64)
If we request more edges over the upperbound, rmatGraph fall into infinite 
loop. So, how about adding an argument validation?

Author: Kenji Kikushima kikushima.ke...@lab.ntt.co.jp

Closes #3950 from kj-ki/SPARK-5064 and squashes the following commits:

4ee18c7 [Ankur Dave] Reword error message and add unit test
d760bc7 [Kenji Kikushima] Add numEdges upperbound validation for R-MAT graph 
generator to prevent infinite loop.

(cherry picked from commit 3ee3ab592eee831d759c940eb68231817ad6d083)
Signed-off-by: Ankur Dave ankurd...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/37db20c9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/37db20c9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/37db20c9

Branch: refs/heads/branch-1.2
Commit: 37db20c9414d26ebd423e9500825bedc037b20f5
Parents: e90f6b5
Author: Kenji Kikushima kikushima.ke...@lab.ntt.co.jp
Authored: Wed Jan 21 12:34:00 2015 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Wed Jan 21 12:38:09 2015 -0800

--
 .../org/apache/spark/graphx/util/GraphGenerators.scala|  6 ++
 .../apache/spark/graphx/util/GraphGeneratorsSuite.scala   | 10 ++
 2 files changed, 16 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/37db20c9/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
index 8a13c74..2d6a825 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
@@ -133,6 +133,12 @@ object GraphGenerators {
 // This ensures that the 4 quadrants are the same size at all recursion 
levels
 val numVertices = math.round(
   math.pow(2.0, math.ceil(math.log(requestedNumVertices) / 
math.log(2.0.toInt
+val numEdgesUpperBound =
+  math.pow(2.0, 2 * ((math.log(numVertices) / math.log(2.0)) - 1)).toInt
+if (numEdgesUpperBound  numEdges) {
+  throw new IllegalArgumentException(
+snumEdges must be = $numEdgesUpperBound but was $numEdges)
+}
 var edges: Set[Edge[Int]] = Set()
 while (edges.size  numEdges) {
   if (edges.size % 100 == 0) {

http://git-wip-us.apache.org/repos/asf/spark/blob/37db20c9/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala
--
diff --git 
a/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala
index 3abefbe..8d9c8dd 100644
--- 
a/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala
+++ 
b/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala
@@ -110,4 +110,14 @@ class GraphGeneratorsSuite extends FunSuite with 
LocalSparkContext {
 }
   }
 
+  test(SPARK-5064 GraphGenerators.rmatGraph numEdges upper bound) {
+withSpark { sc =
+  val g1 = GraphGenerators.rmatGraph(sc, 4, 4)
+  assert(g1.edges.count() === 4)
+  intercept[IllegalArgumentException] {
+val g2 = GraphGenerators.rmatGraph(sc, 4, 8)
+  }
+}
+  }
+
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5064][GraphX] Add numEdges upperbound validation for R-MAT graph generator to prevent infinite loop

2015-01-21 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master 7450a992b - 3ee3ab592


[SPARK-5064][GraphX] Add numEdges upperbound validation for R-MAT graph 
generator to prevent infinite loop

I looked into GraphGenerators#chooseCell, and found that chooseCell can't 
generate more edges than pow(2, (2 * (log2(numVertices)-1))) to make a 
Power-law graph. (Ex. numVertices:4 upperbound:4, numVertices:8 upperbound:16, 
numVertices:16 upperbound:64)
If we request more edges over the upperbound, rmatGraph fall into infinite 
loop. So, how about adding an argument validation?

Author: Kenji Kikushima kikushima.ke...@lab.ntt.co.jp

Closes #3950 from kj-ki/SPARK-5064 and squashes the following commits:

4ee18c7 [Ankur Dave] Reword error message and add unit test
d760bc7 [Kenji Kikushima] Add numEdges upperbound validation for R-MAT graph 
generator to prevent infinite loop.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ee3ab59
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ee3ab59
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ee3ab59

Branch: refs/heads/master
Commit: 3ee3ab592eee831d759c940eb68231817ad6d083
Parents: 7450a99
Author: Kenji Kikushima kikushima.ke...@lab.ntt.co.jp
Authored: Wed Jan 21 12:34:00 2015 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Wed Jan 21 12:36:03 2015 -0800

--
 .../org/apache/spark/graphx/util/GraphGenerators.scala|  6 ++
 .../apache/spark/graphx/util/GraphGeneratorsSuite.scala   | 10 ++
 2 files changed, 16 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3ee3ab59/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
index 8a13c74..2d6a825 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
@@ -133,6 +133,12 @@ object GraphGenerators {
 // This ensures that the 4 quadrants are the same size at all recursion 
levels
 val numVertices = math.round(
   math.pow(2.0, math.ceil(math.log(requestedNumVertices) / 
math.log(2.0.toInt
+val numEdgesUpperBound =
+  math.pow(2.0, 2 * ((math.log(numVertices) / math.log(2.0)) - 1)).toInt
+if (numEdgesUpperBound  numEdges) {
+  throw new IllegalArgumentException(
+snumEdges must be = $numEdgesUpperBound but was $numEdges)
+}
 var edges: Set[Edge[Int]] = Set()
 while (edges.size  numEdges) {
   if (edges.size % 100 == 0) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3ee3ab59/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala
--
diff --git 
a/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala
index 3abefbe..8d9c8dd 100644
--- 
a/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala
+++ 
b/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala
@@ -110,4 +110,14 @@ class GraphGeneratorsSuite extends FunSuite with 
LocalSparkContext {
 }
   }
 
+  test(SPARK-5064 GraphGenerators.rmatGraph numEdges upper bound) {
+withSpark { sc =
+  val g1 = GraphGenerators.rmatGraph(sc, 4, 4)
+  assert(g1.edges.count() === 4)
+  intercept[IllegalArgumentException] {
+val g2 = GraphGenerators.rmatGraph(sc, 4, 8)
+  }
+}
+  }
+
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4917] Add a function to convert into a graph with canonical edges in GraphOps

2015-01-08 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master 8d45834de - f825e193f


[SPARK-4917] Add a function to convert into a graph with canonical edges in 
GraphOps

Convert bi-directional edges into uni-directional ones instead of 
'canonicalOrientation' in GraphLoader.edgeListFile.
This function is useful when a graph is loaded as it is and then is transformed 
into one with canonical edges.
It rewrites the vertex ids of edges so that srcIds are bigger than dstIds, and 
merges the duplicated edges.

Author: Takeshi Yamamuro linguin@gmail.com

Closes #3760 from maropu/ConvertToCanonicalEdgesSpike and squashes the 
following commits:

7f8b580 [Takeshi Yamamuro] Add a function to convert into a graph with 
canonical edges in GraphOps


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f825e193
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f825e193
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f825e193

Branch: refs/heads/master
Commit: f825e193f3357e60949bf4c0174675d0d1a40988
Parents: 8d45834
Author: Takeshi Yamamuro linguin@gmail.com
Authored: Thu Jan 8 09:55:12 2015 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Thu Jan 8 09:55:12 2015 -0800

--
 .../org/apache/spark/graphx/GraphOps.scala  | 26 
 .../org/apache/spark/graphx/GraphOpsSuite.scala | 15 +++
 2 files changed, 41 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f825e193/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 116d1ea..dc8b478 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -279,6 +279,32 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: 
Graph[VD, ED]) extends Seriali
   }
 
   /**
+   * Convert bi-directional edges into uni-directional ones.
+   * Some graph algorithms (e.g., TriangleCount) assume that an input graph
+   * has its edges in canonical direction.
+   * This function rewrites the vertex ids of edges so that srcIds are bigger
+   * than dstIds, and merges the duplicated edges.
+   *
+   * @param mergeFunc the user defined reduce function which should
+   * be commutative and associative and is used to combine the output
+   * of the map phase
+   *
+   * @return the resulting graph with canonical edges
+   */
+  def convertToCanonicalEdges(
+  mergeFunc: (ED, ED) = ED = (e1, e2) = e1): Graph[VD, ED] = {
+val newEdges =
+  graph.edges
+.map {
+  case e if e.srcId  e.dstId = ((e.srcId, e.dstId), e.attr)
+  case e = ((e.dstId, e.srcId), e.attr)
+}
+.reduceByKey(mergeFunc)
+.map(e = new Edge(e._1._1, e._1._2, e._2))
+Graph(graph.vertices, newEdges)
+  }
+
+  /**
* Execute a Pregel-like iterative vertex-parallel abstraction.  The
* user-defined vertex-program `vprog` is executed in parallel on
* each vertex receiving any inbound messages and computing a new

http://git-wip-us.apache.org/repos/asf/spark/blob/f825e193/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
--
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
index ea94d4a..9bc8007 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
@@ -79,6 +79,21 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
 }
   }
 
+  test (convertToCanonicalEdges) {
+withSpark { sc =
+  val vertices =
+sc.parallelize(Seq[(VertexId, String)]((1, one), (2, two), (3, 
three)), 2)
+  val edges =
+sc.parallelize(Seq(Edge(1, 2, 1), Edge(2, 1, 1), Edge(3, 2, 2)))
+  val g: Graph[String, Int] = Graph(vertices, edges)
+
+  val g1 = g.convertToCanonicalEdges()
+
+  val e = g1.edges.collect().toSet
+  assert(e === Set(Edge(1, 2, 1), Edge(2, 3, 2)))
+}
+  }
+
   test(collectEdgesCycleDirectionOut) {
 withSpark { sc =
   val graph = getCycleGraph(sc, 100)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [Minor] Fix comments for GraphX 2D partitioning strategy

2015-01-06 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master a6394bc2c - 5e3ec1110


[Minor] Fix comments for GraphX 2D partitioning strategy

The sum of vertices on matrix (v0 to v11) is 12. And, I think one same block 
overlaps in this strategy.

This is minor PR, so I didn't file in JIRA.

Author: kj-ki kikushima.ke...@lab.ntt.co.jp

Closes #3904 from kj-ki/fix-partitionstrategy-comments and squashes the 
following commits:

79829d9 [kj-ki] Fix comments for 2D partitioning.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5e3ec111
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5e3ec111
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5e3ec111

Branch: refs/heads/master
Commit: 5e3ec1110495899a298313c4aa9c6c151c1f54da
Parents: a6394bc
Author: kj-ki kikushima.ke...@lab.ntt.co.jp
Authored: Tue Jan 6 09:49:37 2015 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Tue Jan 6 09:49:37 2015 -0800

--
 .../main/scala/org/apache/spark/graphx/PartitionStrategy.scala | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5e3ec111/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index 13033fe..7372dfb 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -32,9 +32,9 @@ trait PartitionStrategy extends Serializable {
 object PartitionStrategy {
   /**
* Assigns edges to partitions using a 2D partitioning of the sparse edge 
adjacency matrix,
-   * guaranteeing a `2 * sqrt(numParts)` bound on vertex replication.
+   * guaranteeing a `2 * sqrt(numParts) - 1` bound on vertex replication.
*
-   * Suppose we have a graph with 11 vertices that we want to partition
+   * Suppose we have a graph with 12 vertices that we want to partition
* over 9 machines.  We can use the following sparse matrix representation:
*
* pre
@@ -61,7 +61,7 @@ object PartitionStrategy {
* that edges adjacent to `v11` can only be in the first column of blocks 
`(P0, P3,
* P6)` or the last
* row of blocks `(P6, P7, P8)`.  As a consequence we can guarantee that 
`v11` will need to be
-   * replicated to at most `2 * sqrt(numParts)` machines.
+   * replicated to at most `2 * sqrt(numParts) - 1` machines.
*
* Notice that `P0` has many edges and as a consequence this partitioning 
would lead to poor work
* balance.  To improve balance we first multiply each vertex id by a large 
prime to shuffle the


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4646] Replace Scala.util.Sorting.quickSort with Sorter(TimSort) in Spark

2014-12-07 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master e895e0cbe - 2e6b736b0


[SPARK-4646] Replace Scala.util.Sorting.quickSort with Sorter(TimSort) in Spark

This patch just replaces a native quick sorter with Sorter(TimSort) in Spark.
It could get performance gains by ~8% in my quick experiments.

Author: Takeshi Yamamuro linguin@gmail.com

Closes #3507 from maropu/TimSortInEdgePartitionBuilderSpike and squashes the 
following commits:

8d4e5d2 [Takeshi Yamamuro] Remove a wildcard import
3527e00 [Takeshi Yamamuro] Replace Scala.util.Sorting.quickSort with 
Sorter(TimSort) in Spark


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2e6b736b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2e6b736b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2e6b736b

Branch: refs/heads/master
Commit: 2e6b736b0e6e5920d0523533c87832a53211db42
Parents: e895e0c
Author: Takeshi Yamamuro linguin@gmail.com
Authored: Sun Dec 7 19:36:08 2014 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Sun Dec 7 19:37:14 2014 -0800

--
 .../scala/org/apache/spark/graphx/Edge.scala| 30 +++
 .../graphx/impl/EdgePartitionBuilder.scala  | 39 +---
 2 files changed, 64 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2e6b736b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
index 7e842ec..ecc37dc 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.graphx
 
+import org.apache.spark.util.collection.SortDataFormat
+
 /**
  * A single directed edge consisting of a source id, target id,
  * and the data associated with the edge.
@@ -65,4 +67,32 @@ object Edge {
   else 1
 }
   }
+
+  private[graphx] def edgeArraySortDataFormat[ED] = new 
SortDataFormat[Edge[ED], Array[Edge[ED]]] {
+override def getKey(data: Array[Edge[ED]], pos: Int): Edge[ED] = {
+  data(pos)
+}
+
+override def swap(data: Array[Edge[ED]], pos0: Int, pos1: Int): Unit = {
+  val tmp = data(pos0)
+  data(pos0) = data(pos1)
+  data(pos1) = tmp
+}
+
+override def copyElement(
+src: Array[Edge[ED]], srcPos: Int,
+dst: Array[Edge[ED]], dstPos: Int) {
+  dst(dstPos) = src(srcPos)
+}
+
+override def copyRange(
+src: Array[Edge[ED]], srcPos: Int,
+dst: Array[Edge[ED]], dstPos: Int, length: Int) {
+  System.arraycopy(src, srcPos, dst, dstPos, length)
+}
+
+override def allocate(length: Int): Array[Edge[ED]] = {
+  new Array[Edge[ED]](length)
+}
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2e6b736b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index b0cb0fe..409cf60 100644
--- 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -18,12 +18,10 @@
 package org.apache.spark.graphx.impl
 
 import scala.reflect.ClassTag
-import scala.util.Sorting
-
-import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector}
 
 import org.apache.spark.graphx._
 import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.util.collection.{SortDataFormat, Sorter, 
PrimitiveVector}
 
 /** Constructs an EdgePartition from scratch. */
 private[graphx]
@@ -38,7 +36,8 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) 
ED: ClassTag, VD: Cla
 
   def toEdgePartition: EdgePartition[ED, VD] = {
 val edgeArray = edges.trim().array
-Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering)
+new Sorter(Edge.edgeArraySortDataFormat[ED])
+  .sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering)
 val localSrcIds = new Array[Int](edgeArray.size)
 val localDstIds = new Array[Int](edgeArray.size)
 val data = new Array[ED](edgeArray.size)
@@ -97,7 +96,8 @@ class ExistingEdgePartitionBuilder[
 
   def toEdgePartition: EdgePartition[ED, VD] = {
 val edgeArray = edges.trim().array
-Sorting.quickSort(edgeArray)(EdgeWithLocalIds.lexicographicOrdering)
+new Sorter(EdgeWithLocalIds.edgeArraySortDataFormat[ED])
+  .sort(edgeArray, 0, edgeArray.length, 

spark git commit: [SPARK-4646] Replace Scala.util.Sorting.quickSort with Sorter(TimSort) in Spark

2014-12-07 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 27d9f13af - a4ae7c8b5


[SPARK-4646] Replace Scala.util.Sorting.quickSort with Sorter(TimSort) in Spark

This patch just replaces a native quick sorter with Sorter(TimSort) in Spark.
It could get performance gains by ~8% in my quick experiments.

Author: Takeshi Yamamuro linguin@gmail.com

Closes #3507 from maropu/TimSortInEdgePartitionBuilderSpike and squashes the 
following commits:

8d4e5d2 [Takeshi Yamamuro] Remove a wildcard import
3527e00 [Takeshi Yamamuro] Replace Scala.util.Sorting.quickSort with 
Sorter(TimSort) in Spark

(cherry picked from commit 2e6b736b0e6e5920d0523533c87832a53211db42)
Signed-off-by: Ankur Dave ankurd...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a4ae7c8b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a4ae7c8b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a4ae7c8b

Branch: refs/heads/branch-1.2
Commit: a4ae7c8b533b3998484879439c0982170c3c38a7
Parents: 27d9f13
Author: Takeshi Yamamuro linguin@gmail.com
Authored: Sun Dec 7 19:36:08 2014 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Sun Dec 7 19:37:32 2014 -0800

--
 .../scala/org/apache/spark/graphx/Edge.scala| 30 +++
 .../graphx/impl/EdgePartitionBuilder.scala  | 39 +---
 2 files changed, 64 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a4ae7c8b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
index 7e842ec..ecc37dc 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.graphx
 
+import org.apache.spark.util.collection.SortDataFormat
+
 /**
  * A single directed edge consisting of a source id, target id,
  * and the data associated with the edge.
@@ -65,4 +67,32 @@ object Edge {
   else 1
 }
   }
+
+  private[graphx] def edgeArraySortDataFormat[ED] = new 
SortDataFormat[Edge[ED], Array[Edge[ED]]] {
+override def getKey(data: Array[Edge[ED]], pos: Int): Edge[ED] = {
+  data(pos)
+}
+
+override def swap(data: Array[Edge[ED]], pos0: Int, pos1: Int): Unit = {
+  val tmp = data(pos0)
+  data(pos0) = data(pos1)
+  data(pos1) = tmp
+}
+
+override def copyElement(
+src: Array[Edge[ED]], srcPos: Int,
+dst: Array[Edge[ED]], dstPos: Int) {
+  dst(dstPos) = src(srcPos)
+}
+
+override def copyRange(
+src: Array[Edge[ED]], srcPos: Int,
+dst: Array[Edge[ED]], dstPos: Int, length: Int) {
+  System.arraycopy(src, srcPos, dst, dstPos, length)
+}
+
+override def allocate(length: Int): Array[Edge[ED]] = {
+  new Array[Edge[ED]](length)
+}
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a4ae7c8b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index b0cb0fe..409cf60 100644
--- 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -18,12 +18,10 @@
 package org.apache.spark.graphx.impl
 
 import scala.reflect.ClassTag
-import scala.util.Sorting
-
-import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector}
 
 import org.apache.spark.graphx._
 import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.util.collection.{SortDataFormat, Sorter, 
PrimitiveVector}
 
 /** Constructs an EdgePartition from scratch. */
 private[graphx]
@@ -38,7 +36,8 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) 
ED: ClassTag, VD: Cla
 
   def toEdgePartition: EdgePartition[ED, VD] = {
 val edgeArray = edges.trim().array
-Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering)
+new Sorter(Edge.edgeArraySortDataFormat[ED])
+  .sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering)
 val localSrcIds = new Array[Int](edgeArray.size)
 val localDstIds = new Array[Int](edgeArray.size)
 val data = new Array[ED](edgeArray.size)
@@ -97,7 +96,8 @@ class ExistingEdgePartitionBuilder[
 
   def toEdgePartition: EdgePartition[ED, VD] = {
 val edgeArray = edges.trim().array
-Sorting.quickSort(edgeArray)(EdgeWithLocalIds.lexicographicOrdering)
+new 

spark git commit: [SPARK-4620] Add unpersist in Graph and GraphImpl

2014-12-07 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master 2e6b736b0 - 8817fc7fe


[SPARK-4620] Add unpersist in Graph and GraphImpl

Add an IF to uncache both vertices and edges of Graph/GraphImpl.
This IF is useful when iterative graph operations build a new graph in each 
iteration, and the vertices and edges of previous iterations are no longer 
needed for following iterations.

Author: Takeshi Yamamuro linguin@gmail.com

This patch had conflicts when merged, resolved by
Committer: Ankur Dave ankurd...@gmail.com

Closes #3476 from maropu/UnpersistInGraphSpike and squashes the following 
commits:

77a006a [Takeshi Yamamuro] Add unpersist in Graph and GraphImpl


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8817fc7f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8817fc7f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8817fc7f

Branch: refs/heads/master
Commit: 8817fc7fe8785d7b11138ca744f22f7e70f1f0a0
Parents: 2e6b736
Author: Takeshi Yamamuro linguin@gmail.com
Authored: Sun Dec 7 19:42:02 2014 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Sun Dec 7 19:42:02 2014 -0800

--
 graphx/src/main/scala/org/apache/spark/graphx/Graph.scala  | 6 ++
 .../main/scala/org/apache/spark/graphx/impl/GraphImpl.scala| 6 ++
 2 files changed, 12 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8817fc7f/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 23538b7..84b72b3 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -105,6 +105,12 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected 
() extends Serializab
   def checkpoint(): Unit
 
   /**
+   * Uncaches both vertices and edges of this graph. This is useful in 
iterative algorithms that
+   * build a new graph in each iteration.
+   */
+  def unpersist(blocking: Boolean = true): Graph[VD, ED]
+
+  /**
* Uncaches only the vertices of this graph, leaving the edges alone. This 
is useful in iterative
* algorithms that modify the vertex attributes but reuse the edges. This 
method can be used to
* uncache the vertex attributes of previous iterations once they are no 
longer needed, improving

http://git-wip-us.apache.org/repos/asf/spark/blob/8817fc7f/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index a617d84..3f4a900 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -70,6 +70,12 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
 replicatedVertexView.edges.checkpoint()
   }
 
+  override def unpersist(blocking: Boolean = true): Graph[VD, ED] = {
+unpersistVertices(blocking)
+replicatedVertexView.edges.unpersist(blocking)
+this
+  }
+
   override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = {
 vertices.unpersist(blocking)
 // TODO: unpersist the replicated vertices in `replicatedVertexView` but 
leave the edges alone


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4620] Add unpersist in Graph and GraphImpl

2014-12-07 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 a4ae7c8b5 - 6b9e8b081


[SPARK-4620] Add unpersist in Graph and GraphImpl

Add an IF to uncache both vertices and edges of Graph/GraphImpl.
This IF is useful when iterative graph operations build a new graph in each 
iteration, and the vertices and edges of previous iterations are no longer 
needed for following iterations.

Author: Takeshi Yamamuro linguin@gmail.com

This patch had conflicts when merged, resolved by
Committer: Ankur Dave ankurd...@gmail.com

Closes #3476 from maropu/UnpersistInGraphSpike and squashes the following 
commits:

77a006a [Takeshi Yamamuro] Add unpersist in Graph and GraphImpl

(cherry picked from commit 8817fc7fe8785d7b11138ca744f22f7e70f1f0a0)
Signed-off-by: Ankur Dave ankurd...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b9e8b08
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b9e8b08
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b9e8b08

Branch: refs/heads/branch-1.2
Commit: 6b9e8b081655f71f7ff2c4238254f7aaa110723c
Parents: a4ae7c8
Author: Takeshi Yamamuro linguin@gmail.com
Authored: Sun Dec 7 19:42:02 2014 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Sun Dec 7 19:42:29 2014 -0800

--
 graphx/src/main/scala/org/apache/spark/graphx/Graph.scala  | 6 ++
 .../main/scala/org/apache/spark/graphx/impl/GraphImpl.scala| 6 ++
 2 files changed, 12 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6b9e8b08/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 23538b7..84b72b3 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -105,6 +105,12 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected 
() extends Serializab
   def checkpoint(): Unit
 
   /**
+   * Uncaches both vertices and edges of this graph. This is useful in 
iterative algorithms that
+   * build a new graph in each iteration.
+   */
+  def unpersist(blocking: Boolean = true): Graph[VD, ED]
+
+  /**
* Uncaches only the vertices of this graph, leaving the edges alone. This 
is useful in iterative
* algorithms that modify the vertex attributes but reuse the edges. This 
method can be used to
* uncache the vertex attributes of previous iterations once they are no 
longer needed, improving

http://git-wip-us.apache.org/repos/asf/spark/blob/6b9e8b08/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index a617d84..3f4a900 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -70,6 +70,12 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
 replicatedVertexView.edges.checkpoint()
   }
 
+  override def unpersist(blocking: Boolean = true): Graph[VD, ED] = {
+unpersistVertices(blocking)
+replicatedVertexView.edges.unpersist(blocking)
+this
+  }
+
   override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = {
 vertices.unpersist(blocking)
 // TODO: unpersist the replicated vertices in `replicatedVertexView` but 
leave the edges alone


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-3623][GraphX] GraphX should support the checkpoint operation

2014-12-06 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master 6eb1b6f62 - e895e0cbe


[SPARK-3623][GraphX] GraphX should support the checkpoint operation

Author: GuoQiang Li wi...@qq.com

Closes #2631 from witgo/SPARK-3623 and squashes the following commits:

a70c500 [GuoQiang Li] Remove java related
4d1e249 [GuoQiang Li] Add comments
e682724 [GuoQiang Li] Graph should support the checkpoint operation


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e895e0cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e895e0cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e895e0cb

Branch: refs/heads/master
Commit: e895e0cbecbbec1b412ff21321e57826d2d0a982
Parents: 6eb1b6f
Author: GuoQiang Li wi...@qq.com
Authored: Sat Dec 6 00:56:51 2014 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Sat Dec 6 00:56:51 2014 -0800

--
 .../scala/org/apache/spark/graphx/Graph.scala   |  8 
 .../apache/spark/graphx/impl/GraphImpl.scala|  5 +
 .../org/apache/spark/graphx/GraphSuite.scala| 21 
 3 files changed, 34 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e895e0cb/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 6377915..23538b7 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -97,6 +97,14 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected 
() extends Serializab
   def cache(): Graph[VD, ED]
 
   /**
+   * Mark this Graph for checkpointing. It will be saved to a file inside the 
checkpoint
+   * directory set with SparkContext.setCheckpointDir() and all references to 
its parent
+   * RDDs will be removed. It is strongly recommended that this Graph is 
persisted in
+   * memory, otherwise saving it on a file will require recomputation.
+   */
+  def checkpoint(): Unit
+
+  /**
* Uncaches only the vertices of this graph, leaving the edges alone. This 
is useful in iterative
* algorithms that modify the vertex attributes but reuse the edges. This 
method can be used to
* uncache the vertex attributes of previous iterations once they are no 
longer needed, improving

http://git-wip-us.apache.org/repos/asf/spark/blob/e895e0cb/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 0eae2a6..a617d84 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -65,6 +65,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
 this
   }
 
+  override def checkpoint(): Unit = {
+vertices.checkpoint()
+replicatedVertexView.edges.checkpoint()
+  }
+
   override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = {
 vertices.unpersist(blocking)
 // TODO: unpersist the replicated vertices in `replicatedVertexView` but 
leave the edges alone

http://git-wip-us.apache.org/repos/asf/spark/blob/e895e0cb/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
--
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index a05d1dd..9da0064 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.graphx
 
 import org.scalatest.FunSuite
 
+import com.google.common.io.Files
+
 import org.apache.spark.SparkContext
 import org.apache.spark.graphx.Graph._
 import org.apache.spark.graphx.PartitionStrategy._
@@ -365,4 +367,23 @@ class GraphSuite extends FunSuite with LocalSparkContext {
 }
   }
 
+  test(checkpoint) {
+val checkpointDir = Files.createTempDir()
+checkpointDir.deleteOnExit()
+withSpark { sc =
+  sc.setCheckpointDir(checkpointDir.getAbsolutePath)
+  val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) = 
Edge(a, b, 1)}
+  val rdd = sc.parallelize(ring)
+  val graph = Graph.fromEdges(rdd, 1.0F)
+  graph.checkpoint()
+  graph.edges.map(_.attr).count()
+  graph.vertices.map(_._2).count()
+
+  val edgesDependencies = graph.edges.partitionsRDD.dependencies
+  val verticesDependencies = 

spark git commit: [SPARK-3623][GraphX] GraphX should support the checkpoint operation

2014-12-06 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 11446a648 - 27d9f13af


[SPARK-3623][GraphX] GraphX should support the checkpoint operation

Author: GuoQiang Li wi...@qq.com

Closes #2631 from witgo/SPARK-3623 and squashes the following commits:

a70c500 [GuoQiang Li] Remove java related
4d1e249 [GuoQiang Li] Add comments
e682724 [GuoQiang Li] Graph should support the checkpoint operation

(cherry picked from commit e895e0cbecbbec1b412ff21321e57826d2d0a982)
Signed-off-by: Ankur Dave ankurd...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27d9f13a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27d9f13a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27d9f13a

Branch: refs/heads/branch-1.2
Commit: 27d9f13af2df3bd7af029cf7ac48443ba6f4d6e0
Parents: 11446a6
Author: GuoQiang Li wi...@qq.com
Authored: Sat Dec 6 00:56:51 2014 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Sat Dec 6 00:57:02 2014 -0800

--
 .../scala/org/apache/spark/graphx/Graph.scala   |  8 
 .../apache/spark/graphx/impl/GraphImpl.scala|  5 +
 .../org/apache/spark/graphx/GraphSuite.scala| 21 
 3 files changed, 34 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/27d9f13a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 6377915..23538b7 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -97,6 +97,14 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected 
() extends Serializab
   def cache(): Graph[VD, ED]
 
   /**
+   * Mark this Graph for checkpointing. It will be saved to a file inside the 
checkpoint
+   * directory set with SparkContext.setCheckpointDir() and all references to 
its parent
+   * RDDs will be removed. It is strongly recommended that this Graph is 
persisted in
+   * memory, otherwise saving it on a file will require recomputation.
+   */
+  def checkpoint(): Unit
+
+  /**
* Uncaches only the vertices of this graph, leaving the edges alone. This 
is useful in iterative
* algorithms that modify the vertex attributes but reuse the edges. This 
method can be used to
* uncache the vertex attributes of previous iterations once they are no 
longer needed, improving

http://git-wip-us.apache.org/repos/asf/spark/blob/27d9f13a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 0eae2a6..a617d84 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -65,6 +65,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
 this
   }
 
+  override def checkpoint(): Unit = {
+vertices.checkpoint()
+replicatedVertexView.edges.checkpoint()
+  }
+
   override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = {
 vertices.unpersist(blocking)
 // TODO: unpersist the replicated vertices in `replicatedVertexView` but 
leave the edges alone

http://git-wip-us.apache.org/repos/asf/spark/blob/27d9f13a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
--
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index a05d1dd..9da0064 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.graphx
 
 import org.scalatest.FunSuite
 
+import com.google.common.io.Files
+
 import org.apache.spark.SparkContext
 import org.apache.spark.graphx.Graph._
 import org.apache.spark.graphx.PartitionStrategy._
@@ -365,4 +367,23 @@ class GraphSuite extends FunSuite with LocalSparkContext {
 }
   }
 
+  test(checkpoint) {
+val checkpointDir = Files.createTempDir()
+checkpointDir.deleteOnExit()
+withSpark { sc =
+  sc.setCheckpointDir(checkpointDir.getAbsolutePath)
+  val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) = 
Edge(a, b, 1)}
+  val rdd = sc.parallelize(ring)
+  val graph = Graph.fromEdges(rdd, 1.0F)
+  graph.checkpoint()
+  graph.edges.map(_.attr).count()
+  graph.vertices.map(_._2).count()
+
+  val 

spark git commit: [SPARK-4672][Core]Checkpoint() should clear f to shorten the serialization chain

2014-12-03 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master 17c162f66 - 77be8b986


[SPARK-4672][Core]Checkpoint() should clear f to shorten the serialization chain

The related JIRA is https://issues.apache.org/jira/browse/SPARK-4672

The f closure of `PartitionsRDD(ZippedPartitionsRDD2)` contains a `$outer` that 
references EdgeRDD/VertexRDD, which causes task's serialization chain become 
very long in iterative GraphX applications. As a result, StackOverflow error 
will occur. If we set f = null in `clearDependencies()`, checkpoint() can cut 
off the long serialization chain. More details and explanation can be found in 
the JIRA.

Author: JerryLead jerryl...@163.com
Author: Lijie Xu csxuli...@gmail.com

Closes #3545 from JerryLead/my_core and squashes the following commits:

f7faea5 [JerryLead] checkpoint() should clear the f to avoid StackOverflow error
c0169da [JerryLead] Merge branch 'master' of https://github.com/apache/spark
52799e3 [Lijie Xu] Merge pull request #1 from apache/master


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77be8b98
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77be8b98
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77be8b98

Branch: refs/heads/master
Commit: 77be8b986fd21b7bbe28aa8db1042cb22bc74fe7
Parents: 17c162f
Author: JerryLead jerryl...@163.com
Authored: Tue Dec 2 23:53:29 2014 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Tue Dec 2 23:53:29 2014 -0800

--
 .../scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala| 9 ++---
 1 file changed, 6 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/77be8b98/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index 996f2cd..95b2dd9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -77,7 +77,7 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: 
ClassTag](
 
 private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: 
ClassTag](
 sc: SparkContext,
-f: (Iterator[A], Iterator[B]) = Iterator[V],
+var f: (Iterator[A], Iterator[B]) = Iterator[V],
 var rdd1: RDD[A],
 var rdd2: RDD[B],
 preservesPartitioning: Boolean = false)
@@ -92,13 +92,14 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: 
ClassTag, V: ClassTag]
 super.clearDependencies()
 rdd1 = null
 rdd2 = null
+f = null
   }
 }
 
 private[spark] class ZippedPartitionsRDD3
   [A: ClassTag, B: ClassTag, C: ClassTag, V: ClassTag](
 sc: SparkContext,
-f: (Iterator[A], Iterator[B], Iterator[C]) = Iterator[V],
+var f: (Iterator[A], Iterator[B], Iterator[C]) = Iterator[V],
 var rdd1: RDD[A],
 var rdd2: RDD[B],
 var rdd3: RDD[C],
@@ -117,13 +118,14 @@ private[spark] class ZippedPartitionsRDD3
 rdd1 = null
 rdd2 = null
 rdd3 = null
+f = null
   }
 }
 
 private[spark] class ZippedPartitionsRDD4
   [A: ClassTag, B: ClassTag, C: ClassTag, D:ClassTag, V: ClassTag](
 sc: SparkContext,
-f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) = Iterator[V],
+var f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) = Iterator[V],
 var rdd1: RDD[A],
 var rdd2: RDD[B],
 var rdd3: RDD[C],
@@ -145,5 +147,6 @@ private[spark] class ZippedPartitionsRDD4
 rdd2 = null
 rdd3 = null
 rdd4 = null
+f = null
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4672][Core]Checkpoint() should clear f to shorten the serialization chain

2014-12-03 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 528cce8bc - 667f7ff44


[SPARK-4672][Core]Checkpoint() should clear f to shorten the serialization chain

The related JIRA is https://issues.apache.org/jira/browse/SPARK-4672

The f closure of `PartitionsRDD(ZippedPartitionsRDD2)` contains a `$outer` that 
references EdgeRDD/VertexRDD, which causes task's serialization chain become 
very long in iterative GraphX applications. As a result, StackOverflow error 
will occur. If we set f = null in `clearDependencies()`, checkpoint() can cut 
off the long serialization chain. More details and explanation can be found in 
the JIRA.

Author: JerryLead jerryl...@163.com
Author: Lijie Xu csxuli...@gmail.com

Closes #3545 from JerryLead/my_core and squashes the following commits:

f7faea5 [JerryLead] checkpoint() should clear the f to avoid StackOverflow error
c0169da [JerryLead] Merge branch 'master' of https://github.com/apache/spark
52799e3 [Lijie Xu] Merge pull request #1 from apache/master

(cherry picked from commit 77be8b986fd21b7bbe28aa8db1042cb22bc74fe7)
Signed-off-by: Ankur Dave ankurd...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/667f7ff4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/667f7ff4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/667f7ff4

Branch: refs/heads/branch-1.2
Commit: 667f7ff440dea9b83dbf3910f26d8dbf82d343a5
Parents: 528cce8
Author: JerryLead jerryl...@163.com
Authored: Tue Dec 2 23:53:29 2014 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Tue Dec 2 23:53:38 2014 -0800

--
 .../scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala| 9 ++---
 1 file changed, 6 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/667f7ff4/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index 996f2cd..95b2dd9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -77,7 +77,7 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: 
ClassTag](
 
 private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: 
ClassTag](
 sc: SparkContext,
-f: (Iterator[A], Iterator[B]) = Iterator[V],
+var f: (Iterator[A], Iterator[B]) = Iterator[V],
 var rdd1: RDD[A],
 var rdd2: RDD[B],
 preservesPartitioning: Boolean = false)
@@ -92,13 +92,14 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: 
ClassTag, V: ClassTag]
 super.clearDependencies()
 rdd1 = null
 rdd2 = null
+f = null
   }
 }
 
 private[spark] class ZippedPartitionsRDD3
   [A: ClassTag, B: ClassTag, C: ClassTag, V: ClassTag](
 sc: SparkContext,
-f: (Iterator[A], Iterator[B], Iterator[C]) = Iterator[V],
+var f: (Iterator[A], Iterator[B], Iterator[C]) = Iterator[V],
 var rdd1: RDD[A],
 var rdd2: RDD[B],
 var rdd3: RDD[C],
@@ -117,13 +118,14 @@ private[spark] class ZippedPartitionsRDD3
 rdd1 = null
 rdd2 = null
 rdd3 = null
+f = null
   }
 }
 
 private[spark] class ZippedPartitionsRDD4
   [A: ClassTag, B: ClassTag, C: ClassTag, D:ClassTag, V: ClassTag](
 sc: SparkContext,
-f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) = Iterator[V],
+var f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) = Iterator[V],
 var rdd1: RDD[A],
 var rdd2: RDD[B],
 var rdd3: RDD[C],
@@ -145,5 +147,6 @@ private[spark] class ZippedPartitionsRDD4
 rdd2 = null
 rdd3 = null
 rdd4 = null
+f = null
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4672][GraphX]Perform checkpoint() on PartitionsRDD to shorten the lineage

2014-12-02 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master 5da21f07d - fc0a1475e


[SPARK-4672][GraphX]Perform checkpoint() on PartitionsRDD to shorten the lineage

The related JIRA is https://issues.apache.org/jira/browse/SPARK-4672

Iterative GraphX applications always have long lineage, while checkpoint() on 
EdgeRDD and VertexRDD themselves cannot shorten the lineage. In contrast, if we 
perform checkpoint() on their ParitionsRDD, the long lineage can be cut off. 
Moreover, the existing operations such as cache() in this code is performed on 
the PartitionsRDD, so checkpoint() should do the same way. More details and 
explanation can be found in the JIRA.

Author: JerryLead jerryl...@163.com
Author: Lijie Xu csxuli...@gmail.com

Closes #3549 from JerryLead/my_graphX_checkpoint and squashes the following 
commits:

d1aa8d8 [JerryLead] Perform checkpoint() on PartitionsRDD not VertexRDD and 
EdgeRDD themselves
ff08ed4 [JerryLead] Merge branch 'master' of https://github.com/apache/spark
c0169da [JerryLead] Merge branch 'master' of https://github.com/apache/spark
52799e3 [Lijie Xu] Merge pull request #1 from apache/master


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc0a1475
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc0a1475
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc0a1475

Branch: refs/heads/master
Commit: fc0a1475ef7c8b33363d88adfe8e8f28def5afc7
Parents: 5da21f0
Author: JerryLead jerryl...@163.com
Authored: Tue Dec 2 17:08:02 2014 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Tue Dec 2 17:08:02 2014 -0800

--
 .../main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala| 4 
 .../main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala  | 4 
 2 files changed, 8 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fc0a1475/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
index a816961..504559d 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -70,6 +70,10 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] 
private[graphx] (
 this
   }
 
+  override def checkpoint() = {
+partitionsRDD.checkpoint()
+  }
+
   /** The number of edges in the RDD. */
   override def count(): Long = {
 partitionsRDD.map(_._2.size.toLong).reduce(_ + _)

http://git-wip-us.apache.org/repos/asf/spark/blob/fc0a1475/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index d92a55a..c8898b1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -71,6 +71,10 @@ class VertexRDDImpl[VD] private[graphx] (
 this
   }
 
+  override def checkpoint() = {
+partitionsRDD.checkpoint()
+  }
+
   /** The number of vertices in the RDD. */
   override def count(): Long = {
 partitionsRDD.map(_.size).reduce(_ + _)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4672][GraphX]Perform checkpoint() on PartitionsRDD to shorten the lineage

2014-12-02 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 5e026a3e6 - f1859fc18


[SPARK-4672][GraphX]Perform checkpoint() on PartitionsRDD to shorten the lineage

The related JIRA is https://issues.apache.org/jira/browse/SPARK-4672

Iterative GraphX applications always have long lineage, while checkpoint() on 
EdgeRDD and VertexRDD themselves cannot shorten the lineage. In contrast, if we 
perform checkpoint() on their ParitionsRDD, the long lineage can be cut off. 
Moreover, the existing operations such as cache() in this code is performed on 
the PartitionsRDD, so checkpoint() should do the same way. More details and 
explanation can be found in the JIRA.

Author: JerryLead jerryl...@163.com
Author: Lijie Xu csxuli...@gmail.com

Closes #3549 from JerryLead/my_graphX_checkpoint and squashes the following 
commits:

d1aa8d8 [JerryLead] Perform checkpoint() on PartitionsRDD not VertexRDD and 
EdgeRDD themselves
ff08ed4 [JerryLead] Merge branch 'master' of https://github.com/apache/spark
c0169da [JerryLead] Merge branch 'master' of https://github.com/apache/spark
52799e3 [Lijie Xu] Merge pull request #1 from apache/master

(cherry picked from commit fc0a1475ef7c8b33363d88adfe8e8f28def5afc7)
Signed-off-by: Ankur Dave ankurd...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f1859fc1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f1859fc1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f1859fc1

Branch: refs/heads/branch-1.2
Commit: f1859fc189d9657381fbe82795420de34cad4025
Parents: 5e026a3
Author: JerryLead jerryl...@163.com
Authored: Tue Dec 2 17:08:02 2014 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Tue Dec 2 17:11:05 2014 -0800

--
 .../main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala| 4 
 .../main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala  | 4 
 2 files changed, 8 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f1859fc1/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
index a816961..504559d 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -70,6 +70,10 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] 
private[graphx] (
 this
   }
 
+  override def checkpoint() = {
+partitionsRDD.checkpoint()
+  }
+
   /** The number of edges in the RDD. */
   override def count(): Long = {
 partitionsRDD.map(_._2.size.toLong).reduce(_ + _)

http://git-wip-us.apache.org/repos/asf/spark/blob/f1859fc1/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index d92a55a..c8898b1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -71,6 +71,10 @@ class VertexRDDImpl[VD] private[graphx] (
 this
   }
 
+  override def checkpoint() = {
+partitionsRDD.checkpoint()
+  }
+
   /** The number of vertices in the RDD. */
   override def count(): Long = {
 partitionsRDD.map(_.size).reduce(_ + _)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4672][GraphX]Non-transient PartitionsRDDs will lead to StackOverflow error

2014-12-02 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master fc0a1475e - 17c162f66


[SPARK-4672][GraphX]Non-transient PartitionsRDDs will lead to StackOverflow 
error

The related JIRA is https://issues.apache.org/jira/browse/SPARK-4672

In a nutshell, if `val partitionsRDD` in EdgeRDDImpl and VertexRDDImpl are 
non-transient, the serialization chain can become very long in iterative 
algorithms and finally lead to the StackOverflow error. More details and 
explanation can be found in the JIRA.

Author: JerryLead jerryl...@163.com
Author: Lijie Xu csxuli...@gmail.com

Closes #3544 from JerryLead/my_graphX and squashes the following commits:

628f33c [JerryLead] set PartitionsRDD to be transient in EdgeRDDImpl and 
VertexRDDImpl
c0169da [JerryLead] Merge branch 'master' of https://github.com/apache/spark
52799e3 [Lijie Xu] Merge pull request #1 from apache/master


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/17c162f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/17c162f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/17c162f6

Branch: refs/heads/master
Commit: 17c162f6682520e6e2790626e37da3a074471793
Parents: fc0a147
Author: JerryLead jerryl...@163.com
Authored: Tue Dec 2 17:14:11 2014 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Tue Dec 2 17:14:11 2014 -0800

--
 .../src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala  | 2 +-
 .../main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala| 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/17c162f6/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
index 504559d..897c7ee 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -26,7 +26,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.graphx._
 
 class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
-override val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])],
+@transient override val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, 
VD])],
 val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
   extends EdgeRDD[ED](partitionsRDD.context, List(new 
OneToOneDependency(partitionsRDD))) {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/17c162f6/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index c8898b1..9732c5b 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -27,7 +27,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.graphx._
 
 class VertexRDDImpl[VD] private[graphx] (
-val partitionsRDD: RDD[ShippableVertexPartition[VD]],
+@transient val partitionsRDD: RDD[ShippableVertexPartition[VD]],
 val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
   (implicit override protected val vdTag: ClassTag[VD])
   extends VertexRDD[VD](partitionsRDD.context, List(new 
OneToOneDependency(partitionsRDD))) {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4249][GraphX]fix a problem of EdgePartitionBuilder in Graphx

2014-11-06 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master 23eaf0e12 - d15c6e9dc


[SPARK-4249][GraphX]fix a problem of EdgePartitionBuilder in Graphx

at first srcIds is not initialized and are all 0. so we use edgeArray(0).srcId 
to currSrcId

Author: lianhuiwang lianhuiwan...@gmail.com

Closes #3138 from lianhuiwang/SPARK-4249 and squashes the following commits:

3f4e503 [lianhuiwang] fix a problem of EdgePartitionBuilder in Graphx


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d15c6e9d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d15c6e9d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d15c6e9d

Branch: refs/heads/master
Commit: d15c6e9dc2860bbe56e31ddf71218ccc6d5c841d
Parents: 23eaf0e
Author: lianhuiwang lianhuiwan...@gmail.com
Authored: Thu Nov 6 10:46:45 2014 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Thu Nov 6 10:46:45 2014 -0800

--
 .../org/apache/spark/graphx/impl/EdgePartitionBuilder.scala  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d15c6e9d/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index 4520beb..2b6137b 100644
--- 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -45,8 +45,8 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) 
ED: ClassTag, VD: Cla
 // Copy edges into columnar structures, tracking the beginnings of source 
vertex id clusters and
 // adding them to the index
 if (edgeArray.length  0) {
-  index.update(srcIds(0), 0)
-  var currSrcId: VertexId = srcIds(0)
+  index.update(edgeArray(0).srcId, 0)
+  var currSrcId: VertexId = edgeArray(0).srcId
   var i = 0
   while (i  edgeArray.size) {
 srcIds(i) = edgeArray(i).srcId


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4249][GraphX]fix a problem of EdgePartitionBuilder in Graphx

2014-11-06 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 aaaeaf939 - 9061bc4e1


[SPARK-4249][GraphX]fix a problem of EdgePartitionBuilder in Graphx

at first srcIds is not initialized and are all 0. so we use edgeArray(0).srcId 
to currSrcId

Author: lianhuiwang lianhuiwan...@gmail.com

Closes #3138 from lianhuiwang/SPARK-4249 and squashes the following commits:

3f4e503 [lianhuiwang] fix a problem of EdgePartitionBuilder in Graphx

(cherry picked from commit d15c6e9dc2860bbe56e31ddf71218ccc6d5c841d)
Signed-off-by: Ankur Dave ankurd...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9061bc4e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9061bc4e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9061bc4e

Branch: refs/heads/branch-1.2
Commit: 9061bc4e127abb0c44e37f1b8b7706883d451bc7
Parents: aaaeaf9
Author: lianhuiwang lianhuiwan...@gmail.com
Authored: Thu Nov 6 10:46:45 2014 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Thu Nov 6 10:47:49 2014 -0800

--
 .../org/apache/spark/graphx/impl/EdgePartitionBuilder.scala  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9061bc4e/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index 4520beb..2b6137b 100644
--- 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -45,8 +45,8 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) 
ED: ClassTag, VD: Cla
 // Copy edges into columnar structures, tracking the beginnings of source 
vertex id clusters and
 // adding them to the index
 if (edgeArray.length  0) {
-  index.update(srcIds(0), 0)
-  var currSrcId: VertexId = srcIds(0)
+  index.update(edgeArray(0).srcId, 0)
+  var currSrcId: VertexId = edgeArray(0).srcId
   var i = 0
   while (i  edgeArray.size) {
 srcIds(i) = edgeArray(i).srcId


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4249][GraphX]fix a problem of EdgePartitionBuilder in Graphx

2014-11-06 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 c58c1bb83 - 0a40eac25


[SPARK-4249][GraphX]fix a problem of EdgePartitionBuilder in Graphx

at first srcIds is not initialized and are all 0. so we use edgeArray(0).srcId 
to currSrcId

Author: lianhuiwang lianhuiwan...@gmail.com

Closes #3138 from lianhuiwang/SPARK-4249 and squashes the following commits:

3f4e503 [lianhuiwang] fix a problem of EdgePartitionBuilder in Graphx

(cherry picked from commit d15c6e9dc2860bbe56e31ddf71218ccc6d5c841d)
Signed-off-by: Ankur Dave ankurd...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a40eac2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a40eac2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a40eac2

Branch: refs/heads/branch-1.1
Commit: 0a40eac25a0202c492f58e3c97a96a35ceed6ce8
Parents: c58c1bb
Author: lianhuiwang lianhuiwan...@gmail.com
Authored: Thu Nov 6 10:46:45 2014 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Thu Nov 6 10:48:32 2014 -0800

--
 .../org/apache/spark/graphx/impl/EdgePartitionBuilder.scala  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0a40eac2/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index 4520beb..2b6137b 100644
--- 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -45,8 +45,8 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) 
ED: ClassTag, VD: Cla
 // Copy edges into columnar structures, tracking the beginnings of source 
vertex id clusters and
 // adding them to the index
 if (edgeArray.length  0) {
-  index.update(srcIds(0), 0)
-  var currSrcId: VertexId = srcIds(0)
+  index.update(edgeArray(0).srcId, 0)
+  var currSrcId: VertexId = edgeArray(0).srcId
   var i = 0
   while (i  edgeArray.size) {
 srcIds(i) = edgeArray(i).srcId


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4249][GraphX]fix a problem of EdgePartitionBuilder in Graphx

2014-11-06 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 49224fd0f - 76c20cac9


[SPARK-4249][GraphX]fix a problem of EdgePartitionBuilder in Graphx

at first srcIds is not initialized and are all 0. so we use edgeArray(0).srcId 
to currSrcId

Author: lianhuiwang lianhuiwan...@gmail.com

Closes #3138 from lianhuiwang/SPARK-4249 and squashes the following commits:

3f4e503 [lianhuiwang] fix a problem of EdgePartitionBuilder in Graphx

(cherry picked from commit d15c6e9dc2860bbe56e31ddf71218ccc6d5c841d)
Signed-off-by: Ankur Dave ankurd...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76c20cac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76c20cac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76c20cac

Branch: refs/heads/branch-1.0
Commit: 76c20cac99437ac07bcc9d71e295fd49fd465f4e
Parents: 49224fd
Author: lianhuiwang lianhuiwan...@gmail.com
Authored: Thu Nov 6 10:46:45 2014 -0800
Committer: Ankur Dave ankurd...@gmail.com
Committed: Thu Nov 6 10:49:44 2014 -0800

--
 .../org/apache/spark/graphx/impl/EdgePartitionBuilder.scala  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/76c20cac/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index 4520beb..2b6137b 100644
--- 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -45,8 +45,8 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) 
ED: ClassTag, VD: Cla
 // Copy edges into columnar structures, tracking the beginnings of source 
vertex id clusters and
 // adding them to the index
 if (edgeArray.length  0) {
-  index.update(srcIds(0), 0)
-  var currSrcId: VertexId = srcIds(0)
+  index.update(edgeArray(0).srcId, 0)
+  var currSrcId: VertexId = edgeArray(0).srcId
   var i = 0
   while (i  edgeArray.size) {
 srcIds(i) = edgeArray(i).srcId


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [graphX] GraphOps: random pick vertex bug

2014-09-29 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master 0bbe7faef - 51229ff7f


[graphX] GraphOps: random pick vertex bug

When `numVertices  50`, probability is set to 0. This would cause infinite 
loop.

Author: yingjieMiao ying...@42go.com

Closes #2553 from yingjieMiao/graphx and squashes the following commits:

6adf3c8 [yingjieMiao] [graphX] GraphOps: random pick vertex bug


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51229ff7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51229ff7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51229ff7

Branch: refs/heads/master
Commit: 51229ff7f4d3517706a1cdc1a2943ede1c605089
Parents: 0bbe7fa
Author: yingjieMiao ying...@42go.com
Authored: Mon Sep 29 18:01:27 2014 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Mon Sep 29 18:01:27 2014 -0700

--
 graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/51229ff7/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 02afaa9..d0dd45d 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -254,7 +254,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, 
ED]) extends Seriali
* Picks a random vertex from the graph and returns its ID.
*/
   def pickRandomVertex(): VertexId = {
-val probability = 50 / graph.numVertices
+val probability = 50.0 / graph.numVertices
 var found = false
 var retVal: VertexId = null.asInstanceOf[VertexId]
 while (!found) {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc

2014-09-19 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 cf15b22d4 - 1687d6ba9


[SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc

VertexRDD.apply had a bug where it ignored the merge function for
duplicate vertices and instead used whichever vertex attribute occurred
first. This commit fixes the bug by passing the merge function through
to ShippableVertexPartition.apply, which merges any duplicates using the
merge function and then fills in missing vertices using the specified
default vertex attribute. This commit also adds a unit test for
VertexRDD.apply.

Author: Larry Xiao xia...@sjtu.edu.cn
Author: Blie Arkansol xia...@sjtu.edu.cn
Author: Ankur Dave ankurd...@gmail.com

Closes #1903 from larryxiao/2062 and squashes the following commits:

625aa9d [Blie Arkansol] Merge pull request #1 from ankurdave/SPARK-2062
476770b [Ankur Dave] ShippableVertexPartition.initFrom: Don't run mergeFunc on 
default values
614059f [Larry Xiao] doc update: note about the default null value vertices 
construction
dfdb3c9 [Larry Xiao] minor fix
1c70366 [Larry Xiao] scalastyle check: wrap line, parameter list indent 4 spaces
e4ca697 [Larry Xiao] [TEST] VertexRDD.apply mergeFunc
6a35ea8 [Larry Xiao] [TEST] VertexRDD.apply mergeFunc
4fbc29c [Blie Arkansol] undo unnecessary change
efae765 [Larry Xiao] fix mistakes: should be able to call with or without 
mergeFunc
b2422f9 [Larry Xiao] Merge branch '2062' of github.com:larryxiao/spark into 2062
52dc7f7 [Larry Xiao] pass mergeFunc to VertexPartitionBase, where merge is 
handled
581e9ee [Larry Xiao] TODO: VertexRDDSuite
20d80a3 [Larry Xiao] [SPARK-2062][GraphX] VertexRDD.apply does not use the 
mergeFunc

(cherry picked from commit 3bbbdd8180cf316c6f8dde0e879410b6b29f8cc3)
Signed-off-by: Ankur Dave ankurd...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1687d6ba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1687d6ba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1687d6ba

Branch: refs/heads/branch-1.1
Commit: 1687d6ba95e5335a1445e0e392170a2d462bd356
Parents: cf15b22
Author: Larry Xiao xia...@sjtu.edu.cn
Authored: Thu Sep 18 23:32:32 2014 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Thu Sep 18 23:37:22 2014 -0700

--
 .../org/apache/spark/graphx/VertexRDD.scala |  4 +--
 .../graphx/impl/ShippableVertexPartition.scala  | 28 
 .../apache/spark/graphx/VertexRDDSuite.scala| 11 
 3 files changed, 36 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1687d6ba/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index 04fbc9d..2c8b245 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -392,7 +392,7 @@ object VertexRDD {
*/
   def apply[VD: ClassTag](
   vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD): 
VertexRDD[VD] = {
-VertexRDD(vertices, edges, defaultVal, (a, b) = b)
+VertexRDD(vertices, edges, defaultVal, (a, b) = a)
   }
 
   /**
@@ -419,7 +419,7 @@ object VertexRDD {
   (vertexIter, routingTableIter) =
 val routingTable =
   if (routingTableIter.hasNext) routingTableIter.next() else 
RoutingTablePartition.empty
-Iterator(ShippableVertexPartition(vertexIter, routingTable, 
defaultVal))
+Iterator(ShippableVertexPartition(vertexIter, routingTable, 
defaultVal, mergeFunc))
 }
 new VertexRDD(vertexPartitions)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1687d6ba/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
index dca54b8..5412d72 100644
--- 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
+++ 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
@@ -36,7 +36,7 @@ private[graphx]
 object ShippableVertexPartition {
   /** Construct a `ShippableVertexPartition` from the given vertices without 
any routing table. */
   def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): 
ShippableVertexPartition[VD] =
-apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD])
+apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD], (a, b) = 
a)
 
   /**
* Construct a `ShippableVertexPartition

git commit: [HOTFIX] [SPARK-3400] Revert 9b225ac fix GraphX EdgeRDD zipPartitions

2014-09-04 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 f41c45a75 - 8c40ab5c0


[HOTFIX] [SPARK-3400] Revert 9b225ac fix GraphX EdgeRDD zipPartitions

9b225ac3072de522b40b46aba6df1f1c231f13ef has been causing GraphX tests
to fail nondeterministically, which is blocking development for others.

Author: Ankur Dave ankurd...@gmail.com

Closes #2271 from ankurdave/SPARK-3400 and squashes the following commits:

10c2a97 [Ankur Dave] [HOTFIX] [SPARK-3400] Revert 9b225ac fix GraphX EdgeRDD 
zipPartitions

(cherry picked from commit 00362dac976cd05b06638deb11d990d612429e0b)
Signed-off-by: Ankur Dave ankurd...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8c40ab5c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8c40ab5c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8c40ab5c

Branch: refs/heads/branch-1.1
Commit: 8c40ab5c06ab72e85a8a9d4272fed0e81eca1d3a
Parents: f41c45a
Author: Ankur Dave ankurd...@gmail.com
Authored: Wed Sep 3 23:49:47 2014 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Wed Sep 3 23:50:11 2014 -0700

--
 .../scala/org/apache/spark/graphx/EdgeRDD.scala |  4 ++--
 .../scala/org/apache/spark/graphx/GraphSuite.scala  | 16 
 2 files changed, 2 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8c40ab5c/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 0f1a101..899a3cb 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -19,7 +19,7 @@ package org.apache.spark.graphx
 
 import scala.reflect.{classTag, ClassTag}
 
-import org.apache.spark._
+import org.apache.spark.{OneToOneDependency, Partition, Partitioner, 
TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
@@ -47,7 +47,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
* partitioner that allows co-partitioning with `partitionsRDD`.
*/
   override val partitioner =
-partitionsRDD.partitioner.orElse(Some(new 
HashPartitioner(partitionsRDD.partitions.size)))
+
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
 
   override def compute(part: Partition, context: TaskContext): 
Iterator[Edge[ED]] = {
 val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, 
context)

http://git-wip-us.apache.org/repos/asf/spark/blob/8c40ab5c/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
--
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index eaaa449..6506bac 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.graphx
 
 import org.scalatest.FunSuite
 
-import org.apache.spark.SparkConf
 import org.apache.spark.SparkContext
 import org.apache.spark.graphx.Graph._
 import org.apache.spark.graphx.PartitionStrategy._
@@ -351,19 +350,4 @@ class GraphSuite extends FunSuite with LocalSparkContext {
 }
   }
 
-  test(non-default number of edge partitions) {
-val n = 10
-val defaultParallelism = 3
-val numEdgePartitions = 4
-assert(defaultParallelism != numEdgePartitions)
-val conf = new SparkConf()
-  .set(spark.default.parallelism, defaultParallelism.toString)
-val sc = new SparkContext(local, test, conf)
-val edges = sc.parallelize((1 to n).map(x = (x: VertexId, 0: VertexId)),
-  numEdgePartitions)
-val graph = Graph.fromEdgeTuples(edges, 1)
-val neighborAttrSums = graph.mapReduceTriplets[Int](
-  et = Iterator((et.dstId, et.srcAttr)), _ + _)
-assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n)))
-  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [HOTFIX] [SPARK-3400] Revert 9b225ac fix GraphX EdgeRDD zipPartitions

2014-09-04 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master 1bed0a386 - 00362dac9


[HOTFIX] [SPARK-3400] Revert 9b225ac fix GraphX EdgeRDD zipPartitions

9b225ac3072de522b40b46aba6df1f1c231f13ef has been causing GraphX tests
to fail nondeterministically, which is blocking development for others.

Author: Ankur Dave ankurd...@gmail.com

Closes #2271 from ankurdave/SPARK-3400 and squashes the following commits:

10c2a97 [Ankur Dave] [HOTFIX] [SPARK-3400] Revert 9b225ac fix GraphX EdgeRDD 
zipPartitions


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00362dac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00362dac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00362dac

Branch: refs/heads/master
Commit: 00362dac976cd05b06638deb11d990d612429e0b
Parents: 1bed0a3
Author: Ankur Dave ankurd...@gmail.com
Authored: Wed Sep 3 23:49:47 2014 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Wed Sep 3 23:49:47 2014 -0700

--
 .../scala/org/apache/spark/graphx/EdgeRDD.scala |  4 ++--
 .../scala/org/apache/spark/graphx/GraphSuite.scala  | 16 
 2 files changed, 2 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/00362dac/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 35fbd47..5bcb96b 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -19,7 +19,7 @@ package org.apache.spark.graphx
 
 import scala.reflect.{classTag, ClassTag}
 
-import org.apache.spark._
+import org.apache.spark.{OneToOneDependency, Partition, Partitioner, 
TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
@@ -55,7 +55,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
* partitioner that allows co-partitioning with `partitionsRDD`.
*/
   override val partitioner =
-partitionsRDD.partitioner.orElse(Some(new 
HashPartitioner(partitionsRDD.partitions.size)))
+
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
 
   override def compute(part: Partition, context: TaskContext): 
Iterator[Edge[ED]] = {
 val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, 
context)

http://git-wip-us.apache.org/repos/asf/spark/blob/00362dac/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
--
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index eaaa449..6506bac 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.graphx
 
 import org.scalatest.FunSuite
 
-import org.apache.spark.SparkConf
 import org.apache.spark.SparkContext
 import org.apache.spark.graphx.Graph._
 import org.apache.spark.graphx.PartitionStrategy._
@@ -351,19 +350,4 @@ class GraphSuite extends FunSuite with LocalSparkContext {
 }
   }
 
-  test(non-default number of edge partitions) {
-val n = 10
-val defaultParallelism = 3
-val numEdgePartitions = 4
-assert(defaultParallelism != numEdgePartitions)
-val conf = new SparkConf()
-  .set(spark.default.parallelism, defaultParallelism.toString)
-val sc = new SparkContext(local, test, conf)
-val edges = sc.parallelize((1 to n).map(x = (x: VertexId, 0: VertexId)),
-  numEdgePartitions)
-val graph = Graph.fromEdgeTuples(edges, 1)
-val neighborAttrSums = graph.mapReduceTriplets[Int](
-  et = Iterator((et.dstId, et.srcAttr)), _ + _)
-assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n)))
-  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [HOTFIX] [SPARK-3400] Revert 9b225ac fix GraphX EdgeRDD zipPartitions

2014-09-04 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 8dd7690e2 - 4d3ab2925


[HOTFIX] [SPARK-3400] Revert 9b225ac fix GraphX EdgeRDD zipPartitions

9b225ac3072de522b40b46aba6df1f1c231f13ef has been causing GraphX tests
to fail nondeterministically, which is blocking development for others.

Author: Ankur Dave ankurd...@gmail.com

Closes #2271 from ankurdave/SPARK-3400 and squashes the following commits:

10c2a97 [Ankur Dave] [HOTFIX] [SPARK-3400] Revert 9b225ac fix GraphX EdgeRDD 
zipPartitions

(cherry picked from commit 00362dac976cd05b06638deb11d990d612429e0b)
Signed-off-by: Ankur Dave ankurd...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d3ab292
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d3ab292
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d3ab292

Branch: refs/heads/branch-1.0
Commit: 4d3ab292576396e11d9e5c2dabb676c07b34d286
Parents: 8dd7690
Author: Ankur Dave ankurd...@gmail.com
Authored: Wed Sep 3 23:49:47 2014 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Wed Sep 3 23:50:22 2014 -0700

--
 .../scala/org/apache/spark/graphx/EdgeRDD.scala |  4 ++--
 .../scala/org/apache/spark/graphx/GraphSuite.scala  | 16 
 2 files changed, 2 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4d3ab292/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 4dd15bf..a8fc095 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -19,7 +19,7 @@ package org.apache.spark.graphx
 
 import scala.reflect.{classTag, ClassTag}
 
-import org.apache.spark._
+import org.apache.spark.{OneToOneDependency, Partition, Partitioner, 
TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
@@ -45,7 +45,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
* partitioner that allows co-partitioning with `partitionsRDD`.
*/
   override val partitioner =
-partitionsRDD.partitioner.orElse(Some(new 
HashPartitioner(partitionsRDD.partitions.size)))
+
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
 
   override def compute(part: Partition, context: TaskContext): 
Iterator[Edge[ED]] = {
 val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, 
context)

http://git-wip-us.apache.org/repos/asf/spark/blob/4d3ab292/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
--
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index e1b83c2..abc25d0 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.graphx
 
 import org.scalatest.FunSuite
 
-import org.apache.spark.SparkConf
 import org.apache.spark.SparkContext
 import org.apache.spark.graphx.Graph._
 import org.apache.spark.graphx.PartitionStrategy._
@@ -326,19 +325,4 @@ class GraphSuite extends FunSuite with LocalSparkContext {
 }
   }
 
-  test(non-default number of edge partitions) {
-val n = 10
-val defaultParallelism = 3
-val numEdgePartitions = 4
-assert(defaultParallelism != numEdgePartitions)
-val conf = new SparkConf()
-  .set(spark.default.parallelism, defaultParallelism.toString)
-val sc = new SparkContext(local, test, conf)
-val edges = sc.parallelize((1 to n).map(x = (x: VertexId, 0: VertexId)),
-  numEdgePartitions)
-val graph = Graph.fromEdgeTuples(edges, 1)
-val neighborAttrSums = graph.mapReduceTriplets[Int](
-  et = Iterator((et.dstId, et.srcAttr)), _ + _)
-assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n)))
-  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-3263][GraphX] Fix changes made to GraphGenerator.logNormalGraph in PR #720

2014-09-03 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master 6481d2742 - e5d376801


[SPARK-3263][GraphX] Fix changes made to GraphGenerator.logNormalGraph in PR 
#720

PR #720 made multiple changes to GraphGenerator.logNormalGraph including:

* Replacing the call to functions for generating random vertices and edges with 
in-line implementations with different equations. Based on reading the Pregel 
paper, I believe the in-line functions are incorrect.
* Hard-coding of RNG seeds so that method now generates the same graph for a 
given number of vertices, edges, mu, and sigma -- user is not able to override 
seed or specify that seed should be randomly generated.
* Backwards-incompatible change to logNormalGraph signature with introduction 
of new required parameter.
* Failed to update scala docs and programming guide for API changes
* Added a Synthetic Benchmark in the examples.

This PR:
* Removes the in-line calls and calls original vertex / edge generation 
functions again
* Adds an optional seed parameter for deterministic behavior (when desired)
* Keeps the number of partitions parameter that was added.
* Keeps compatibility with the synthetic benchmark example
* Maintains backwards-compatible API

Author: RJ Nowling rnowl...@gmail.com
Author: Ankur Dave ankurd...@gmail.com

Closes #2168 from rnowling/graphgenrand and squashes the following commits:

f1cd79f [Ankur Dave] Style fixes
e11918e [RJ Nowling] Fix bad comparisons in unit tests
785ac70 [RJ Nowling] Fix style error
c70868d [RJ Nowling] Fix logNormalGraph scala doc for seed
41fd1f8 [RJ Nowling] Fix logNormalGraph scala doc for seed
799f002 [RJ Nowling] Added test for different seeds for sampleLogNormal
43949ad [RJ Nowling] Added test for different seeds for generateRandomEdges
2faf75f [RJ Nowling] Added unit test for logNormalGraph
82f22397 [RJ Nowling] Add unit test for sampleLogNormal
b99cba9 [RJ Nowling] Make sampleLogNormal private to Spark (vs private) for 
unit testing
6803da1 [RJ Nowling] Add GraphGeneratorsSuite with test for generateRandomEdges
1c8fc44 [RJ Nowling] Connected components part of SynthBenchmark was failing to 
call count on RDD before printing
dfbb6dd [RJ Nowling] Fix parameter name in SynthBenchmark docs
b5eeb80 [RJ Nowling] Add optional seed parameter to SynthBenchmark and set 
default to randomly generate a seed
1ff8d30 [RJ Nowling] Fix bug in generateRandomEdges where numVertices instead 
of numEdges was used to control number of edges to generate
98bb73c [RJ Nowling] Add documentation for logNormalGraph parameters
d40141a [RJ Nowling] Fix style error
684804d [RJ Nowling] revert PR #720 which introduce errors in logNormalGraph 
and messed up seeding of RNGs.  Add user-defined optional seed for 
deterministic behavior
c183136 [RJ Nowling] Fix to deterministic GraphGenerators.logNormalGraph that 
allows generating graphs randomly using optional seed.
015010c [RJ Nowling] Fixed GraphGenerator logNormalGraph API to make 
backward-incompatible change in commit 894ecde04


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e5d37680
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e5d37680
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e5d37680

Branch: refs/heads/master
Commit: e5d376801d57dffb0791980a1786a0a9b45bc491
Parents: 6481d27
Author: RJ Nowling rnowl...@gmail.com
Authored: Wed Sep 3 14:15:22 2014 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Wed Sep 3 14:16:06 2014 -0700

--
 .../spark/examples/graphx/SynthBenchmark.scala  |   9 +-
 .../spark/graphx/util/GraphGenerators.scala |  65 ++-
 .../graphx/util/GraphGeneratorsSuite.scala  | 110 +++
 3 files changed, 152 insertions(+), 32 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e5d37680/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala 
b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala
index 551c339..5f35a58 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala
@@ -38,12 +38,13 @@ object SynthBenchmark {
* Options:
*   -app pagerank or cc for pagerank or connected components. (Default: 
pagerank)
*   -niters the number of iterations of pagerank to use (Default: 10)
-   *   -numVertices the number of vertices in the graph (Default: 100)
+   *   -nverts the number of vertices in the graph (Default: 100)
*   -numEPart the number of edge partitions in the graph (Default: number 
of cores)
*   -partStrategy the graph partitioning 

git commit: [SPARK-1986][GraphX]move lib.Analytics to org.apache.spark.examples

2014-09-02 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master 644e31524 - 7c92b49d6


[SPARK-1986][GraphX]move lib.Analytics to org.apache.spark.examples

to support ~/spark/bin/run-example GraphXAnalytics triangles
/soc-LiveJournal1.txt --numEPart=256

Author: Larry Xiao xia...@sjtu.edu.cn

Closes #1766 from larryxiao/1986 and squashes the following commits:

bb77cd9 [Larry Xiao] [SPARK-1986][GraphX]move lib.Analytics to 
org.apache.spark.examples


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c92b49d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c92b49d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c92b49d

Branch: refs/heads/master
Commit: 7c92b49d6b62f88fcde883aacb60c5e32ae54b30
Parents: 644e315
Author: Larry Xiao xia...@sjtu.edu.cn
Authored: Tue Sep 2 18:29:08 2014 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Tue Sep 2 18:29:08 2014 -0700

--
 .../spark/examples/graphx/Analytics.scala   | 162 +++
 .../examples/graphx/LiveJournalPageRank.scala   |   2 +-
 .../org/apache/spark/graphx/lib/Analytics.scala | 161 --
 3 files changed, 163 insertions(+), 162 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7c92b49d/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala 
b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala
new file mode 100644
index 000..c4317a6
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.graphx
+
+import scala.collection.mutable
+import org.apache.spark._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.lib._
+import org.apache.spark.graphx.PartitionStrategy._
+
+/**
+ * Driver program for running graph algorithms.
+ */
+object Analytics extends Logging {
+
+  def main(args: Array[String]): Unit = {
+if (args.length  2) {
+  System.err.println(
+Usage: Analytics taskType file --numEPart=num_edge_partitions 
[other options])
+  System.exit(1)
+}
+
+val taskType = args(0)
+val fname = args(1)
+val optionsList = args.drop(2).map { arg =
+  arg.dropWhile(_ == '-').split('=') match {
+case Array(opt, v) = (opt - v)
+case _ = throw new IllegalArgumentException(Invalid argument:  + 
arg)
+  }
+}
+val options = mutable.Map(optionsList: _*)
+
+def pickPartitioner(v: String): PartitionStrategy = {
+  // TODO: Use reflection rather than listing all the partitioning 
strategies here.
+  v match {
+case RandomVertexCut = RandomVertexCut
+case EdgePartition1D = EdgePartition1D
+case EdgePartition2D = EdgePartition2D
+case CanonicalRandomVertexCut = CanonicalRandomVertexCut
+case _ = throw new IllegalArgumentException(Invalid 
PartitionStrategy:  + v)
+  }
+}
+
+val conf = new SparkConf()
+  .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
+  .set(spark.kryo.registrator, 
org.apache.spark.graphx.GraphKryoRegistrator)
+  .set(spark.locality.wait, 10)
+
+val numEPart = options.remove(numEPart).map(_.toInt).getOrElse {
+  println(Set the number of edge partitions using --numEPart.)
+  sys.exit(1)
+}
+val partitionStrategy: Option[PartitionStrategy] = 
options.remove(partStrategy)
+  .map(pickPartitioner(_))
+val edgeStorageLevel = options.remove(edgeStorageLevel)
+  .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
+val vertexStorageLevel = options.remove(vertexStorageLevel)
+  .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
+
+taskType match {
+  case pagerank =
+val tol = 

git commit: [SPARK-3123][GraphX]: override the setName function to set EdgeRDD's name manually just as VertexRDD does.

2014-09-02 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master 7c92b49d6 - 7c9bbf172


[SPARK-3123][GraphX]: override the setName function to set EdgeRDD's name 
manually just as VertexRDD does.

Author: uncleGen husty...@gmail.com

Closes #2033 from uncleGen/master_origin and squashes the following commits:

801994b [uncleGen] Update EdgeRDD.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c9bbf17
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c9bbf17
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c9bbf17

Branch: refs/heads/master
Commit: 7c9bbf172512701c75992671bcb2f4b6d9e5034b
Parents: 7c92b49
Author: uncleGen husty...@gmail.com
Authored: Tue Sep 2 18:41:54 2014 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Tue Sep 2 18:44:58 2014 -0700

--
 .../src/main/scala/org/apache/spark/graphx/EdgeRDD.scala  | 10 +-
 1 file changed, 9 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7c9bbf17/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 899a3cb..5bcb96b 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -37,7 +37,15 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
 val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
   extends RDD[Edge[ED]](partitionsRDD.context, List(new 
OneToOneDependency(partitionsRDD))) {
 
-  partitionsRDD.setName(EdgeRDD)
+  override def setName(_name: String): this.type = {
+if (partitionsRDD.name != null) {
+  partitionsRDD.setName(partitionsRDD.name + ,  + _name)
+} else {
+  partitionsRDD.setName(_name)
+}
+this
+  }
+  setName(EdgeRDD)
 
   override protected def getPartitions: Array[Partition] = 
partitionsRDD.partitions
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-2981][GraphX] EdgePartition1D Int overflow

2014-09-02 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master 7c9bbf172 - aa7de128c


[SPARK-2981][GraphX] EdgePartition1D Int overflow

minor fix
detail is here: https://issues.apache.org/jira/browse/SPARK-2981

Author: Larry Xiao xia...@sjtu.edu.cn

Closes #1902 from larryxiao/2981 and squashes the following commits:

88059a2 [Larry Xiao] [SPARK-2981][GraphX] EdgePartition1D Int overflow


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa7de128
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa7de128
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa7de128

Branch: refs/heads/master
Commit: aa7de128c5987fd2e134736f07ae913ad1f5eb26
Parents: 7c9bbf1
Author: Larry Xiao xia...@sjtu.edu.cn
Authored: Tue Sep 2 18:50:52 2014 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Tue Sep 2 18:50:52 2014 -0700

--
 .../src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/aa7de128/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index 5e7e72a..13033fe 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -91,7 +91,7 @@ object PartitionStrategy {
   case object EdgePartition1D extends PartitionStrategy {
 override def getPartition(src: VertexId, dst: VertexId, numParts: 
PartitionID): PartitionID = {
   val mixingPrime: VertexId = 1125899906842597L
-  (math.abs(src) * mixingPrime).toInt % numParts
+  (math.abs(src * mixingPrime) % numParts).toInt
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-2981][GraphX] EdgePartition1D Int overflow

2014-09-02 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 7267e402c - 9b0cff2d4


[SPARK-2981][GraphX] EdgePartition1D Int overflow

minor fix
detail is here: https://issues.apache.org/jira/browse/SPARK-2981

Author: Larry Xiao xia...@sjtu.edu.cn

Closes #1902 from larryxiao/2981 and squashes the following commits:

88059a2 [Larry Xiao] [SPARK-2981][GraphX] EdgePartition1D Int overflow

(cherry picked from commit aa7de128c5987fd2e134736f07ae913ad1f5eb26)
Signed-off-by: Ankur Dave ankurd...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b0cff2d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b0cff2d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b0cff2d

Branch: refs/heads/branch-1.1
Commit: 9b0cff2d45027cc348f5c5dd095d137368457779
Parents: 7267e40
Author: Larry Xiao xia...@sjtu.edu.cn
Authored: Tue Sep 2 18:50:52 2014 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Tue Sep 2 18:51:03 2014 -0700

--
 .../src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9b0cff2d/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index 5e7e72a..13033fe 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -91,7 +91,7 @@ object PartitionStrategy {
   case object EdgePartition1D extends PartitionStrategy {
 override def getPartition(src: VertexId, dst: VertexId, numParts: 
PartitionID): PartitionID = {
   val mixingPrime: VertexId = 1125899906842597L
-  (math.abs(src) * mixingPrime).toInt % numParts
+  (math.abs(src * mixingPrime) % numParts).toInt
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-2981][GraphX] EdgePartition1D Int overflow

2014-09-02 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 5481196ab - d60f60ccc


[SPARK-2981][GraphX] EdgePartition1D Int overflow

minor fix
detail is here: https://issues.apache.org/jira/browse/SPARK-2981

Author: Larry Xiao xia...@sjtu.edu.cn

Closes #1902 from larryxiao/2981 and squashes the following commits:

88059a2 [Larry Xiao] [SPARK-2981][GraphX] EdgePartition1D Int overflow

(cherry picked from commit aa7de128c5987fd2e134736f07ae913ad1f5eb26)
Signed-off-by: Ankur Dave ankurd...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d60f60cc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d60f60cc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d60f60cc

Branch: refs/heads/branch-1.0
Commit: d60f60ccc73ff717e2811a549a2a9ed5bdfd405b
Parents: 5481196
Author: Larry Xiao xia...@sjtu.edu.cn
Authored: Tue Sep 2 18:50:52 2014 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Tue Sep 2 18:51:16 2014 -0700

--
 .../src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d60f60cc/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index 1526cce..a1ab199 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -91,7 +91,7 @@ object PartitionStrategy {
   case object EdgePartition1D extends PartitionStrategy {
 override def getPartition(src: VertexId, dst: VertexId, numParts: 
PartitionID): PartitionID = {
   val mixingPrime: VertexId = 1125899906842597L
-  (math.abs(src) * mixingPrime).toInt % numParts
+  (math.abs(src * mixingPrime) % numParts).toInt
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-2823][GraphX]fix GraphX EdgeRDD zipPartitions

2014-09-02 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 0c8183cb3 - ffdb2fcf8


[SPARK-2823][GraphX]fix GraphX EdgeRDD zipPartitions

If the users set “spark.default.parallelism” and the value is different 
with the EdgeRDD partition number, GraphX jobs will throw:
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of 
partitions

Author: luluorta luluo...@gmail.com

Closes #1763 from luluorta/fix-graph-zip and squashes the following commits:

8338961 [luluorta] fix GraphX EdgeRDD zipPartitions

(cherry picked from commit 9b225ac3072de522b40b46aba6df1f1c231f13ef)
Signed-off-by: Ankur Dave ankurd...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ffdb2fcf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ffdb2fcf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ffdb2fcf

Branch: refs/heads/branch-1.1
Commit: ffdb2fcf8cd5880375bee52ee101e0373bf63e27
Parents: 0c8183c
Author: luluorta luluo...@gmail.com
Authored: Tue Sep 2 19:25:52 2014 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Tue Sep 2 19:28:57 2014 -0700

--
 .../scala/org/apache/spark/graphx/EdgeRDD.scala |  4 ++--
 .../scala/org/apache/spark/graphx/GraphSuite.scala  | 16 
 2 files changed, 18 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ffdb2fcf/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 899a3cb..0f1a101 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -19,7 +19,7 @@ package org.apache.spark.graphx
 
 import scala.reflect.{classTag, ClassTag}
 
-import org.apache.spark.{OneToOneDependency, Partition, Partitioner, 
TaskContext}
+import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
@@ -47,7 +47,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
* partitioner that allows co-partitioning with `partitionsRDD`.
*/
   override val partitioner =
-
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
+partitionsRDD.partitioner.orElse(Some(new 
HashPartitioner(partitionsRDD.partitions.size)))
 
   override def compute(part: Partition, context: TaskContext): 
Iterator[Edge[ED]] = {
 val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, 
context)

http://git-wip-us.apache.org/repos/asf/spark/blob/ffdb2fcf/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
--
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 6506bac..eaaa449 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.graphx
 
 import org.scalatest.FunSuite
 
+import org.apache.spark.SparkConf
 import org.apache.spark.SparkContext
 import org.apache.spark.graphx.Graph._
 import org.apache.spark.graphx.PartitionStrategy._
@@ -350,4 +351,19 @@ class GraphSuite extends FunSuite with LocalSparkContext {
 }
   }
 
+  test(non-default number of edge partitions) {
+val n = 10
+val defaultParallelism = 3
+val numEdgePartitions = 4
+assert(defaultParallelism != numEdgePartitions)
+val conf = new SparkConf()
+  .set(spark.default.parallelism, defaultParallelism.toString)
+val sc = new SparkContext(local, test, conf)
+val edges = sc.parallelize((1 to n).map(x = (x: VertexId, 0: VertexId)),
+  numEdgePartitions)
+val graph = Graph.fromEdgeTuples(edges, 1)
+val neighborAttrSums = graph.mapReduceTriplets[Int](
+  et = Iterator((et.dstId, et.srcAttr)), _ + _)
+assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n)))
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-2823][GraphX]fix GraphX EdgeRDD zipPartitions

2014-09-02 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master e9bb12bea - 9b225ac30


[SPARK-2823][GraphX]fix GraphX EdgeRDD zipPartitions

If the users set “spark.default.parallelism” and the value is different 
with the EdgeRDD partition number, GraphX jobs will throw:
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of 
partitions

Author: luluorta luluo...@gmail.com

Closes #1763 from luluorta/fix-graph-zip and squashes the following commits:

8338961 [luluorta] fix GraphX EdgeRDD zipPartitions


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b225ac3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b225ac3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b225ac3

Branch: refs/heads/master
Commit: 9b225ac3072de522b40b46aba6df1f1c231f13ef
Parents: e9bb12b
Author: luluorta luluo...@gmail.com
Authored: Tue Sep 2 19:25:52 2014 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Tue Sep 2 19:26:27 2014 -0700

--
 .../scala/org/apache/spark/graphx/EdgeRDD.scala |  4 ++--
 .../scala/org/apache/spark/graphx/GraphSuite.scala  | 16 
 2 files changed, 18 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9b225ac3/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 5bcb96b..35fbd47 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -19,7 +19,7 @@ package org.apache.spark.graphx
 
 import scala.reflect.{classTag, ClassTag}
 
-import org.apache.spark.{OneToOneDependency, Partition, Partitioner, 
TaskContext}
+import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
@@ -55,7 +55,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
* partitioner that allows co-partitioning with `partitionsRDD`.
*/
   override val partitioner =
-
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
+partitionsRDD.partitioner.orElse(Some(new 
HashPartitioner(partitionsRDD.partitions.size)))
 
   override def compute(part: Partition, context: TaskContext): 
Iterator[Edge[ED]] = {
 val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, 
context)

http://git-wip-us.apache.org/repos/asf/spark/blob/9b225ac3/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
--
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 6506bac..eaaa449 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.graphx
 
 import org.scalatest.FunSuite
 
+import org.apache.spark.SparkConf
 import org.apache.spark.SparkContext
 import org.apache.spark.graphx.Graph._
 import org.apache.spark.graphx.PartitionStrategy._
@@ -350,4 +351,19 @@ class GraphSuite extends FunSuite with LocalSparkContext {
 }
   }
 
+  test(non-default number of edge partitions) {
+val n = 10
+val defaultParallelism = 3
+val numEdgePartitions = 4
+assert(defaultParallelism != numEdgePartitions)
+val conf = new SparkConf()
+  .set(spark.default.parallelism, defaultParallelism.toString)
+val sc = new SparkContext(local, test, conf)
+val edges = sc.parallelize((1 to n).map(x = (x: VertexId, 0: VertexId)),
+  numEdgePartitions)
+val graph = Graph.fromEdgeTuples(edges, 1)
+val neighborAttrSums = graph.mapReduceTriplets[Int](
+  et = Iterator((et.dstId, et.srcAttr)), _ + _)
+assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n)))
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r1607545 - in /spark: images/graphx-perf-comparison.png site/images/graphx-perf-comparison.png

2014-07-03 Thread ankurdave
Author: ankurdave
Date: Thu Jul  3 07:08:46 2014
New Revision: 1607545

URL: http://svn.apache.org/r1607545
Log:
Correct the GraphX performance comparison graphic

Modified:
spark/images/graphx-perf-comparison.png
spark/site/images/graphx-perf-comparison.png

Modified: spark/images/graphx-perf-comparison.png
URL: 
http://svn.apache.org/viewvc/spark/images/graphx-perf-comparison.png?rev=1607545r1=1607544r2=1607545view=diff
==
Binary files - no diff available.

Modified: spark/site/images/graphx-perf-comparison.png
URL: 
http://svn.apache.org/viewvc/spark/site/images/graphx-perf-comparison.png?rev=1607545r1=1607544r2=1607545view=diff
==
Binary files - no diff available.




git commit: Minor: Fix documentation error from apache/spark#946

2014-06-04 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master 11ded3f66 - abea2d4ff


Minor: Fix documentation error from apache/spark#946

Author: Ankur Dave ankurd...@gmail.com

Closes #970 from ankurdave/SPARK-1991_docfix and squashes the following commits:

6d07343 [Ankur Dave] Minor: Fix documentation error from apache/spark#946


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abea2d4f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abea2d4f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abea2d4f

Branch: refs/heads/master
Commit: abea2d4ff099036c67fc73136d0e61d0d0e22123
Parents: 11ded3f
Author: Ankur Dave ankurd...@gmail.com
Authored: Wed Jun 4 16:45:53 2014 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Wed Jun 4 16:45:53 2014 -0700

--
 graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/abea2d4f/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
index 2e814e3..f4c7936 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
@@ -49,8 +49,8 @@ object GraphLoader extends Logging {
* @param canonicalOrientation whether to orient edges in the positive
*direction
* @param minEdgePartitions the number of partitions for the edge RDD
-   * @param edgeStorageLevel the desired storage level for the edge 
partitions. To set the vertex
-   *storage level, call 
[[org.apache.spark.graphx.Graph#persistVertices]].
+   * @param edgeStorageLevel the desired storage level for the edge partitions
+   * @param vertexStorageLevel the desired storage level for the vertex 
partitions
*/
   def edgeListFile(
   sc: SparkContext,



git commit: Synthetic GraphX Benchmark

2014-06-03 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master aa41a522d - 894ecde04


Synthetic GraphX Benchmark

This PR accomplishes two things:

1. It introduces a Synthetic Benchmark application that generates an 
arbitrarily large log-normal graph and executes either PageRank or connected 
components on the graph.  This can be used to profile GraphX system on 
arbitrary clusters without access to large graph datasets

2. This PR improves the implementation of the log-normal graph generator.

Author: Joseph E. Gonzalez joseph.e.gonza...@gmail.com
Author: Ankur Dave ankurd...@gmail.com

Closes #720 from jegonzal/graphx_synth_benchmark and squashes the following 
commits:

e40812a [Ankur Dave] Exclude all of GraphX from compatibility checks vs. 1.0.0
bad [Ankur Dave] Fix long lines
374678a [Ankur Dave] Bugfix and style changes
1bdf39a [Joseph E. Gonzalez] updating options
d943972 [Joseph E. Gonzalez] moving the benchmark application into the examples 
folder.
f4f839a [Joseph E. Gonzalez] Creating a synthetic benchmark script.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/894ecde0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/894ecde0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/894ecde0

Branch: refs/heads/master
Commit: 894ecde04faa7e2054a40825a58b2e9cdaa93c70
Parents: aa41a52
Author: Joseph E. Gonzalez joseph.e.gonza...@gmail.com
Authored: Tue Jun 3 14:14:48 2014 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Tue Jun 3 14:14:48 2014 -0700

--
 .../spark/examples/graphx/SynthBenchmark.scala  | 128 +++
 .../apache/spark/graphx/PartitionStrategy.scala |   9 ++
 .../spark/graphx/util/GraphGenerators.scala |  41 --
 project/MimaExcludes.scala  |   4 +-
 4 files changed, 171 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/894ecde0/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala 
b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala
new file mode 100644
index 000..551c339
--- /dev/null
+++ 
b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.graphx
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.graphx.PartitionStrategy
+import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.graphx.util.GraphGenerators
+import java.io.{PrintWriter, FileOutputStream}
+
+/**
+ * The SynthBenchmark application can be used to run various GraphX algorithms 
on
+ * synthetic log-normal graphs.  The intent of this code is to enable users to
+ * profile the GraphX system without access to large graph datasets.
+ */
+object SynthBenchmark {
+
+  /**
+   * To run this program use the following:
+   *
+   * MASTER=spark://foobar bin/run-example graphx.SynthBenchmark -app=pagerank
+   *
+   * Options:
+   *   -app pagerank or cc for pagerank or connected components. (Default: 
pagerank)
+   *   -niters the number of iterations of pagerank to use (Default: 10)
+   *   -numVertices the number of vertices in the graph (Default: 100)
+   *   -numEPart the number of edge partitions in the graph (Default: number 
of cores)
+   *   -partStrategy the graph partitioning strategy to use
+   *   -mu the mean parameter for the log-normal graph (Default: 4.0)
+   *   -sigma the stdev parameter for the log-normal graph (Default: 1.3)
+   *   -degFile the local file to save the degree information (Default: Empty)
+   */
+  def main(args: Array[String]) {
+val options = args.map {
+  arg =
+arg.dropWhile(_ == '-').split('=') match {
+  case Array(opt, v) = (opt - v)
+  case _ = throw new IllegalArgumentException(Invalid argument:  + 
arg)
+}
+}
+
+

git commit: Enable repartitioning of graph over different number of partitions

2014-06-03 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master e8d93ee52 - 5284ca78d


Enable repartitioning of graph over different number of partitions

It is currently very difficult to repartition a graph over a different number 
of partitions.  This PR adds an additional `partitionBy` function that takes 
the number of partitions.

Author: Joseph E. Gonzalez joseph.e.gonza...@gmail.com

Closes #719 from jegonzal/graph_partitioning_options and squashes the following 
commits:

730b405 [Joseph E. Gonzalez] adding an additional number of partitions option 
to partitionBy


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5284ca78
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5284ca78
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5284ca78

Branch: refs/heads/master
Commit: 5284ca78d17fb4de9a7019f3bbecf86484c13763
Parents: e8d93ee
Author: Joseph E. Gonzalez joseph.e.gonza...@gmail.com
Authored: Tue Jun 3 20:49:14 2014 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Tue Jun 3 20:49:14 2014 -0700

--
 graphx/src/main/scala/org/apache/spark/graphx/Graph.scala | 10 ++
 .../scala/org/apache/spark/graphx/PartitionStrategy.scala |  8 +---
 .../scala/org/apache/spark/graphx/impl/GraphImpl.scala|  6 +-
 3 files changed, 20 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5284ca78/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index c4f9d65..14ae50e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -106,10 +106,20 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] 
protected () extends Serializab
 
   /**
* Repartitions the edges in the graph according to `partitionStrategy`.
+   *
+   * @param the partitioning strategy to use when partitioning the edges in 
the graph.
*/
   def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
 
   /**
+   * Repartitions the edges in the graph according to `partitionStrategy`.
+   *
+   * @param the partitioning strategy to use when partitioning the edges in 
the graph.
+   * @param numPartitions the number of edge partitions in the new graph.
+   */
+  def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): 
Graph[VD, ED]
+
+  /**
* Transforms each vertex attribute in the graph using the map function.
*
* @note The new graph has the same structure.  As a consequence the 
underlying index structures

http://git-wip-us.apache.org/repos/asf/spark/blob/5284ca78/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index ef412cf..5e7e72a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -114,9 +114,11 @@ object PartitionStrategy {
*/
   case object CanonicalRandomVertexCut extends PartitionStrategy {
 override def getPartition(src: VertexId, dst: VertexId, numParts: 
PartitionID): PartitionID = {
-  val lower = math.min(src, dst)
-  val higher = math.max(src, dst)
-  math.abs((lower, higher).hashCode()) % numParts
+  if (src  dst) {
+math.abs((src, dst).hashCode()) % numParts
+  } else {
+math.abs((dst, src).hashCode()) % numParts
+  }
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5284ca78/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 59d9a88..15ea05c 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -74,7 +74,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
   }
 
   override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, 
ED] = {
-val numPartitions = edges.partitions.size
+partitionBy(partitionStrategy, edges.partitions.size)
+  }
+
+  override def partitionBy(
+  partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] 
= {
 val edTag = classTag[ED]
 val vdTag = classTag[VD]
 val newEdges = 

git commit: initial version of LPA

2014-05-29 Thread ankurdave
Repository: spark
Updated Branches:
  refs/heads/master 8f7141fbc - b7e28fa45


initial version of LPA

A straightforward implementation of LPA algorithm for detecting graph 
communities using the Pregel framework.  Amongst the growing literature on 
community detection algorithms in networks, LPA is perhaps the most elementary, 
and despite its flaws it remains a nice and simple approach.

Author: Ankur Dave ankurd...@gmail.com
Author: haroldsultan haroldsul...@gmail.com
Author: Harold Sultan haroldsul...@gmail.com

Closes #905 from haroldsultan/master and squashes the following commits:

327aee0 [haroldsultan] Merge pull request #2 from ankurdave/label-propagation
227a4d0 [Ankur Dave] Untabify
0ac574c [haroldsultan] Merge pull request #1 from ankurdave/label-propagation
0e24303 [Ankur Dave] Add LabelPropagationSuite
84aa061 [Ankur Dave] LabelPropagation: Fix compile errors and style; rename 
from LPA
9830342 [Harold Sultan] initial version of LPA


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7e28fa4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7e28fa4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7e28fa4

Branch: refs/heads/master
Commit: b7e28fa451511b3b0f849c3d2919ac9c2e4231a1
Parents: 8f7141f
Author: Ankur Dave ankurd...@gmail.com
Authored: Thu May 29 15:39:25 2014 -0700
Committer: Ankur Dave ankurd...@gmail.com
Committed: Thu May 29 15:39:25 2014 -0700

--
 .../spark/graphx/lib/LabelPropagation.scala | 66 
 .../graphx/lib/LabelPropagationSuite.scala  | 45 +
 2 files changed, 111 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b7e28fa4/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
--
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
new file mode 100644
index 000..776bfb8
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.lib
+
+import scala.reflect.ClassTag
+import org.apache.spark.graphx._
+
+/** Label Propagation algorithm. */
+object LabelPropagation {
+  /**
+   * Run static Label Propagation for detecting communities in networks.
+   *
+   * Each node in the network is initially assigned to its own community. At 
every superstep, nodes
+   * send their community affiliation to all neighbors and update their state 
to the mode community
+   * affiliation of incoming messages.
+   *
+   * LPA is a standard community detection algorithm for graphs. It is very 
inexpensive
+   * computationally, although (1) convergence is not guaranteed and (2) one 
can end up with
+   * trivial solutions (all nodes are identified into a single community).
+   *
+   * @tparam ED the edge attribute type (not used in the computation)
+   *
+   * @param graph the graph for which to compute the community affiliation
+   * @param maxSteps the number of supersteps of LPA to be performed. Because 
this is a static
+   * implementation, the algorithm will run for exactly this many supersteps.
+   *
+   * @return a graph with vertex attributes containing the label of community 
affiliation
+   */
+  def run[ED: ClassTag](graph: Graph[_, ED], maxSteps: Int): Graph[VertexId, 
ED] = {
+val lpaGraph = graph.mapVertices { case (vid, _) = vid }
+def sendMessage(e: EdgeTriplet[VertexId, ED]) = {
+  Iterator((e.srcId, Map(e.dstAttr - 1L)), (e.dstId, Map(e.srcAttr - 
1L)))
+}
+def mergeMessage(count1: Map[VertexId, Long], count2: Map[VertexId, Long])
+  : Map[VertexId, Long] = {
+  (count1.keySet ++ count2.keySet).map { i =
+val count1Val = count1.getOrElse(i, 0L)
+val count2Val = count2.getOrElse(i, 0L)
+i - (count1Val + count2Val)
+  }.toMap
+}
+def vertexProgram(vid: VertexId