Repository: spark
Updated Branches:
  refs/heads/master aa43a8da0 -> 0402be90f


Internal cleanup for aggregateMessages

1. Add EdgeActiveness enum to represent activeness criteria more cleanly than 
using booleans.
2. Comments and whitespace.

Author: Ankur Dave <[email protected]>

Closes #3231 from ankurdave/aggregateMessages-followup and squashes the 
following commits:

3d485c3 [Ankur Dave] Internal cleanup for aggregateMessages


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0402be90
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0402be90
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0402be90

Branch: refs/heads/master
Commit: 0402be90f7af82c8404cafbca79f5f9fb8e2bbed
Parents: aa43a8d
Author: Ankur Dave <[email protected]>
Authored: Wed Nov 12 13:44:49 2014 -0800
Committer: Reynold Xin <[email protected]>
Committed: Wed Nov 12 13:44:49 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/graphx/Graph.scala   |  3 +-
 .../spark/graphx/impl/EdgeActiveness.java       | 34 +++++++++++++
 .../spark/graphx/impl/EdgePartition.scala       | 52 ++++++++++----------
 .../apache/spark/graphx/impl/GraphImpl.scala    | 14 +++---
 4 files changed, 69 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0402be90/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index e0ba940..2c1b951 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -207,8 +207,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected 
() extends Serializab
    * }}}
    *
    */
-  def mapTriplets[ED2: ClassTag](
-      map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
+  def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, 
ED2] = {
     mapTriplets((pid, iter) => iter.map(map), TripletFields.All)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0402be90/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeActiveness.java
----------------------------------------------------------------------
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeActiveness.java 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeActiveness.java
new file mode 100644
index 0000000..377ae84
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeActiveness.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl;
+
+/**
+ * Criteria for filtering edges based on activeness. For internal use only.
+ */
+public enum EdgeActiveness {
+  /** Neither the source vertex nor the destination vertex need be active. */
+  Neither,
+  /** The source vertex must be active. */
+  SrcOnly,
+  /** The destination vertex must be active. */
+  DstOnly,
+  /** Both vertices must be active. */
+  Both,
+  /** At least one vertex must be active. */
+  Either
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0402be90/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index 78d8ac2..373af75 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -64,6 +64,7 @@ class EdgePartition[
     activeSet: Option[VertexSet])
   extends Serializable {
 
+  /** No-arg constructor for serialization. */
   private def this() = this(null, null, null, null, null, null, null, null)
 
   /** Return a new `EdgePartition` with the specified edge data. */
@@ -375,12 +376,7 @@ class EdgePartition[
    * @param sendMsg generates messages to neighboring vertices of an edge
    * @param mergeMsg the combiner applied to messages destined to the same 
vertex
    * @param tripletFields which triplet fields `sendMsg` uses
-   * @param srcMustBeActive if true, edges will only be considered if their 
source vertex is in the
-   *   active set
-   * @param dstMustBeActive if true, edges will only be considered if their 
destination vertex is in
-   *   the active set
-   * @param maySatisfyEither if true, only one vertex need be in the active 
set for an edge to be
-   *   considered
+   * @param activeness criteria for filtering edges based on activeness
    *
    * @return iterator aggregated messages keyed by the receiving vertex id
    */
@@ -388,9 +384,7 @@ class EdgePartition[
       sendMsg: EdgeContext[VD, ED, A] => Unit,
       mergeMsg: (A, A) => A,
       tripletFields: TripletFields,
-      srcMustBeActive: Boolean,
-      dstMustBeActive: Boolean,
-      maySatisfyEither: Boolean): Iterator[(VertexId, A)] = {
+      activeness: EdgeActiveness): Iterator[(VertexId, A)] = {
     val aggregates = new Array[A](vertexAttrs.length)
     val bitset = new BitSet(vertexAttrs.length)
 
@@ -401,10 +395,13 @@ class EdgePartition[
       val srcId = local2global(localSrcId)
       val localDstId = localDstIds(i)
       val dstId = local2global(localDstId)
-      val srcIsActive = !srcMustBeActive || isActive(srcId)
-      val dstIsActive = !dstMustBeActive || isActive(dstId)
       val edgeIsActive =
-        if (maySatisfyEither) srcIsActive || dstIsActive else srcIsActive && 
dstIsActive
+        if (activeness == EdgeActiveness.Neither) true
+        else if (activeness == EdgeActiveness.SrcOnly) isActive(srcId)
+        else if (activeness == EdgeActiveness.DstOnly) isActive(dstId)
+        else if (activeness == EdgeActiveness.Both) isActive(srcId) && 
isActive(dstId)
+        else if (activeness == EdgeActiveness.Either) isActive(srcId) || 
isActive(dstId)
+        else throw new Exception("unreachable")
       if (edgeIsActive) {
         val srcAttr = if (tripletFields.useSrc) vertexAttrs(localSrcId) else 
null.asInstanceOf[VD]
         val dstAttr = if (tripletFields.useDst) vertexAttrs(localDstId) else 
null.asInstanceOf[VD]
@@ -424,12 +421,7 @@ class EdgePartition[
    * @param sendMsg generates messages to neighboring vertices of an edge
    * @param mergeMsg the combiner applied to messages destined to the same 
vertex
    * @param tripletFields which triplet fields `sendMsg` uses
-   * @param srcMustBeActive if true, edges will only be considered if their 
source vertex is in the
-   *   active set
-   * @param dstMustBeActive if true, edges will only be considered if their 
destination vertex is in
-   *   the active set
-   * @param maySatisfyEither if true, only one vertex need be in the active 
set for an edge to be
-   *   considered
+   * @param activeness criteria for filtering edges based on activeness
    *
    * @return iterator aggregated messages keyed by the receiving vertex id
    */
@@ -437,9 +429,7 @@ class EdgePartition[
       sendMsg: EdgeContext[VD, ED, A] => Unit,
       mergeMsg: (A, A) => A,
       tripletFields: TripletFields,
-      srcMustBeActive: Boolean,
-      dstMustBeActive: Boolean,
-      maySatisfyEither: Boolean): Iterator[(VertexId, A)] = {
+      activeness: EdgeActiveness): Iterator[(VertexId, A)] = {
     val aggregates = new Array[A](vertexAttrs.length)
     val bitset = new BitSet(vertexAttrs.length)
 
@@ -448,8 +438,16 @@ class EdgePartition[
       val clusterSrcId = cluster._1
       val clusterPos = cluster._2
       val clusterLocalSrcId = localSrcIds(clusterPos)
-      val srcIsActive = !srcMustBeActive || isActive(clusterSrcId)
-      if (srcIsActive || maySatisfyEither) {
+
+      val scanCluster =
+        if (activeness == EdgeActiveness.Neither) true
+        else if (activeness == EdgeActiveness.SrcOnly) isActive(clusterSrcId)
+        else if (activeness == EdgeActiveness.DstOnly) true
+        else if (activeness == EdgeActiveness.Both) isActive(clusterSrcId)
+        else if (activeness == EdgeActiveness.Either) true
+        else throw new Exception("unreachable")
+
+      if (scanCluster) {
         var pos = clusterPos
         val srcAttr =
           if (tripletFields.useSrc) vertexAttrs(clusterLocalSrcId) else 
null.asInstanceOf[VD]
@@ -457,9 +455,13 @@ class EdgePartition[
         while (pos < size && localSrcIds(pos) == clusterLocalSrcId) {
           val localDstId = localDstIds(pos)
           val dstId = local2global(localDstId)
-          val dstIsActive = !dstMustBeActive || isActive(dstId)
           val edgeIsActive =
-            if (maySatisfyEither) srcIsActive || dstIsActive else srcIsActive 
&& dstIsActive
+            if (activeness == EdgeActiveness.Neither) true
+            else if (activeness == EdgeActiveness.SrcOnly) true
+            else if (activeness == EdgeActiveness.DstOnly) isActive(dstId)
+            else if (activeness == EdgeActiveness.Both) isActive(dstId)
+            else if (activeness == EdgeActiveness.Either) 
isActive(clusterSrcId) || isActive(dstId)
+            else throw new Exception("unreachable")
           if (edgeIsActive) {
             val dstAttr =
               if (tripletFields.useDst) vertexAttrs(localDstId) else 
null.asInstanceOf[VD]

http://git-wip-us.apache.org/repos/asf/spark/blob/0402be90/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index a1fe158..2b4636a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -218,30 +218,30 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
           case Some(EdgeDirection.Both) =>
             if (activeFraction < 0.8) {
               edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, 
tripletFields,
-                true, true, false)
+                EdgeActiveness.Both)
             } else {
               edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, 
tripletFields,
-                true, true, false)
+                EdgeActiveness.Both)
             }
           case Some(EdgeDirection.Either) =>
             // TODO: Because we only have a clustered index on the source 
vertex ID, we can't filter
             // the index here. Instead we have to scan all edges and then do 
the filter.
             edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, 
tripletFields,
-              true, true, true)
+              EdgeActiveness.Either)
           case Some(EdgeDirection.Out) =>
             if (activeFraction < 0.8) {
               edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, 
tripletFields,
-                true, false, false)
+                EdgeActiveness.SrcOnly)
             } else {
               edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, 
tripletFields,
-                true, false, false)
+                EdgeActiveness.SrcOnly)
             }
           case Some(EdgeDirection.In) =>
             edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, 
tripletFields,
-              false, true, false)
+              EdgeActiveness.DstOnly)
           case _ => // None
             edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, 
tripletFields,
-              false, false, false)
+              EdgeActiveness.Neither)
         }
     }).setName("GraphImpl.aggregateMessages - preAgg")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to