Updated Branches:
  refs/heads/trunk a5bc5bb35 -> 2b95451e1

GIRAPH-459: Group Vertex Mutations by Partition ID


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/2b95451e
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/2b95451e
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/2b95451e

Branch: refs/heads/trunk
Commit: 2b95451e16fb8e29636fc625fbcd817ec7b4adb2
Parents: a5bc5bb
Author: Claudio Martella <[email protected]>
Authored: Fri Jan 11 03:46:00 2013 +0100
Committer: Claudio Martella <[email protected]>
Committed: Fri Jan 11 03:46:00 2013 +0100

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../giraph/comm/netty/NettyWorkerServer.java       |  128 ++++++++-------
 2 files changed, 68 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/2b95451e/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index d365bfa..78f01db 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-459: Group Vertex Mutations by Partition ID (claudio)
+
   GIRAPH-473: InputSplitPathOrganizer should be aware of multiple threads 
(apresta via ereisman)
 
   GIRAPH-478: Bring back jar-with-deps for giraph-hcatalog (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/2b95451e/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index 82ef831..e2866fd 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -43,10 +43,13 @@ import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.Sets;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
 
 import java.net.InetSocketAddress;
-import java.util.Set;
+import java.util.Collection;
+import java.util.Map.Entry;
 
 /**
  * Netty worker server that implement {@link WorkerServer} and contains
@@ -152,79 +155,80 @@ public class NettyWorkerServer<I extends 
WritableComparable,
 
   @Override
   public void resolveMutations(GraphState<I, V, E, M> graphState) {
-    Set<I> resolveVertexIndexSet = Sets.newHashSet();
+    Multimap<Integer, I> resolveVertexIndices = HashMultimap.create(
+        service.getPartitionStore().getNumPartitions(), 100);
+      // Add any mutated vertex indices to be resolved
+    for (Entry<I, VertexMutations<I, V, E, M>> e :
+        serverData.getVertexMutations().entrySet()) {
+      I vertexId = e.getKey();
+      Integer partitionId = service.getPartitionId(vertexId);
+      if (!resolveVertexIndices.put(partitionId, vertexId)) {
+        throw new IllegalStateException(
+            "resolveMutations: Already has missing vertex on this " +
+                "worker for " + vertexId);
+      }
+    }
     // Keep track of the vertices which are not here but have received messages
     for (Integer partitionId : service.getPartitionStore().getPartitionIds()) {
-      for (I vertexId : serverData.getCurrentMessageStore().
-          getPartitionDestinationVertices(partitionId)) {
-        Vertex<I, V, E, M> vertex = service.getVertex(vertexId);
-        if (vertex == null) {
-          if (!resolveVertexIndexSet.add(vertexId)) {
-            throw new IllegalStateException(
-                "prepareSuperstep: Already has missing vertex on this " +
-                    "worker for " + vertexId);
+      Iterable<I> destinations = serverData.getCurrentMessageStore().
+          getPartitionDestinationVertices(partitionId);
+      if (!Iterables.isEmpty(destinations)) {
+        Partition<I, V, E, M> partition =
+            service.getPartitionStore().getPartition(partitionId);
+        for (I vertexId : destinations) {
+          if (partition.getVertex(vertexId) == null) {
+            if (!resolveVertexIndices.put(partitionId, vertexId)) {
+              throw new IllegalStateException(
+                  "resolveMutations: Already has missing vertex on this " +
+                      "worker for " + vertexId);
+            }
           }
         }
       }
     }
-
-    // Add any mutated vertex indices to be resolved
-    for (I vertexIndex : serverData.getVertexMutations().keySet()) {
-      if (!resolveVertexIndexSet.add(vertexIndex)) {
-        throw new IllegalStateException(
-            "prepareSuperstep: Already has missing vertex on this " +
-                "worker for " + vertexIndex);
-      }
-    }
-
     // Resolve all graph mutations
     VertexResolver<I, V, E, M> vertexResolver =
         conf.createVertexResolver(graphState);
-    for (I vertexIndex : resolveVertexIndexSet) {
-      Vertex<I, V, E, M> originalVertex =
-          service.getVertex(vertexIndex);
-
-      VertexMutations<I, V, E, M> mutations = null;
-      VertexMutations<I, V, E, M> vertexMutations =
-          serverData.getVertexMutations().get(vertexIndex);
-      if (vertexMutations != null) {
-        synchronized (vertexMutations) {
-          mutations = vertexMutations.copy();
-        }
-        serverData.getVertexMutations().remove(vertexIndex);
-      }
-      Vertex<I, V, E, M> vertex = vertexResolver.resolve(
-          vertexIndex, originalVertex, mutations,
-          serverData.getCurrentMessageStore().
-              hasMessagesForVertex(vertexIndex));
-      graphState.getContext().progress();
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("prepareSuperstep: Resolved vertex index " +
-            vertexIndex + " with original vertex " +
-            originalVertex + ", returned vertex " + vertex +
-            " on superstep " + service.getSuperstep() +
-            " with mutations " +
-            mutations);
-      }
-
+    for (Entry<Integer, Collection<I>> e :
+        resolveVertexIndices.asMap().entrySet()) {
       Partition<I, V, E, M> partition =
-          service.getPartition(vertexIndex);
-      if (partition == null) {
-        throw new IllegalStateException(
-            "prepareSuperstep: No partition for index " + vertexIndex +
-                " in " + service.getPartitionStore() + " should have been " +
-                service.getVertexPartitionOwner(vertexIndex));
-      }
-      if (vertex != null) {
-        partition.putVertex(vertex);
-      } else if (originalVertex != null) {
-        partition.removeVertex(originalVertex.getId());
+          service.getPartitionStore().getPartition(e.getKey());
+      for (I vertexIndex : e.getValue()) {
+        Vertex<I, V, E, M> originalVertex =
+            partition.getVertex(vertexIndex);
+
+        VertexMutations<I, V, E, M> mutations = null;
+        VertexMutations<I, V, E, M> vertexMutations =
+            serverData.getVertexMutations().get(vertexIndex);
+        if (vertexMutations != null) {
+          synchronized (vertexMutations) {
+            mutations = vertexMutations.copy();
+          }
+          serverData.getVertexMutations().remove(vertexIndex);
+        }
+        Vertex<I, V, E, M> vertex = vertexResolver.resolve(
+            vertexIndex, originalVertex, mutations,
+            serverData.getCurrentMessageStore().
+                hasMessagesForVertex(vertexIndex));
+        graphState.getContext().progress();
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("resolveMutations: Resolved vertex index " +
+              vertexIndex + " with original vertex " +
+              originalVertex + ", returned vertex " + vertex +
+              " on superstep " + service.getSuperstep() +
+              " with mutations " +
+              mutations);
+        }
+        if (vertex != null) {
+          partition.putVertex(vertex);
+        } else if (originalVertex != null) {
+          partition.removeVertex(originalVertex.getId());
+        }
       }
     }
-
     if (!serverData.getVertexMutations().isEmpty()) {
-      throw new IllegalStateException("prepareSuperstep: Illegally " +
+      throw new IllegalStateException("resolveMutations: Illegally " +
           "still has " + serverData.getVertexMutations().size() +
           " mutations left.");
     }

Reply via email to