Updated Branches: refs/heads/trunk a5bc5bb35 -> 2b95451e1
GIRAPH-459: Group Vertex Mutations by Partition ID Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/2b95451e Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/2b95451e Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/2b95451e Branch: refs/heads/trunk Commit: 2b95451e16fb8e29636fc625fbcd817ec7b4adb2 Parents: a5bc5bb Author: Claudio Martella <[email protected]> Authored: Fri Jan 11 03:46:00 2013 +0100 Committer: Claudio Martella <[email protected]> Committed: Fri Jan 11 03:46:00 2013 +0100 ---------------------------------------------------------------------- CHANGELOG | 2 + .../giraph/comm/netty/NettyWorkerServer.java | 128 ++++++++------- 2 files changed, 68 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/2b95451e/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index d365bfa..78f01db 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-459: Group Vertex Mutations by Partition ID (claudio) + GIRAPH-473: InputSplitPathOrganizer should be aware of multiple threads (apresta via ereisman) GIRAPH-478: Bring back jar-with-deps for giraph-hcatalog (nitay) http://git-wip-us.apache.org/repos/asf/giraph/blob/2b95451e/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java index 82ef831..e2866fd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java @@ -43,10 +43,13 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; -import com.google.common.collect.Sets; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; import java.net.InetSocketAddress; -import java.util.Set; +import java.util.Collection; +import java.util.Map.Entry; /** * Netty worker server that implement {@link WorkerServer} and contains @@ -152,79 +155,80 @@ public class NettyWorkerServer<I extends WritableComparable, @Override public void resolveMutations(GraphState<I, V, E, M> graphState) { - Set<I> resolveVertexIndexSet = Sets.newHashSet(); + Multimap<Integer, I> resolveVertexIndices = HashMultimap.create( + service.getPartitionStore().getNumPartitions(), 100); + // Add any mutated vertex indices to be resolved + for (Entry<I, VertexMutations<I, V, E, M>> e : + serverData.getVertexMutations().entrySet()) { + I vertexId = e.getKey(); + Integer partitionId = service.getPartitionId(vertexId); + if (!resolveVertexIndices.put(partitionId, vertexId)) { + throw new IllegalStateException( + "resolveMutations: Already has missing vertex on this " + + "worker for " + vertexId); + } + } // Keep track of the vertices which are not here but have received messages for (Integer partitionId : service.getPartitionStore().getPartitionIds()) { - for (I vertexId : serverData.getCurrentMessageStore(). - getPartitionDestinationVertices(partitionId)) { - Vertex<I, V, E, M> vertex = service.getVertex(vertexId); - if (vertex == null) { - if (!resolveVertexIndexSet.add(vertexId)) { - throw new IllegalStateException( - "prepareSuperstep: Already has missing vertex on this " + - "worker for " + vertexId); + Iterable<I> destinations = serverData.getCurrentMessageStore(). + getPartitionDestinationVertices(partitionId); + if (!Iterables.isEmpty(destinations)) { + Partition<I, V, E, M> partition = + service.getPartitionStore().getPartition(partitionId); + for (I vertexId : destinations) { + if (partition.getVertex(vertexId) == null) { + if (!resolveVertexIndices.put(partitionId, vertexId)) { + throw new IllegalStateException( + "resolveMutations: Already has missing vertex on this " + + "worker for " + vertexId); + } } } } } - - // Add any mutated vertex indices to be resolved - for (I vertexIndex : serverData.getVertexMutations().keySet()) { - if (!resolveVertexIndexSet.add(vertexIndex)) { - throw new IllegalStateException( - "prepareSuperstep: Already has missing vertex on this " + - "worker for " + vertexIndex); - } - } - // Resolve all graph mutations VertexResolver<I, V, E, M> vertexResolver = conf.createVertexResolver(graphState); - for (I vertexIndex : resolveVertexIndexSet) { - Vertex<I, V, E, M> originalVertex = - service.getVertex(vertexIndex); - - VertexMutations<I, V, E, M> mutations = null; - VertexMutations<I, V, E, M> vertexMutations = - serverData.getVertexMutations().get(vertexIndex); - if (vertexMutations != null) { - synchronized (vertexMutations) { - mutations = vertexMutations.copy(); - } - serverData.getVertexMutations().remove(vertexIndex); - } - Vertex<I, V, E, M> vertex = vertexResolver.resolve( - vertexIndex, originalVertex, mutations, - serverData.getCurrentMessageStore(). - hasMessagesForVertex(vertexIndex)); - graphState.getContext().progress(); - - if (LOG.isDebugEnabled()) { - LOG.debug("prepareSuperstep: Resolved vertex index " + - vertexIndex + " with original vertex " + - originalVertex + ", returned vertex " + vertex + - " on superstep " + service.getSuperstep() + - " with mutations " + - mutations); - } - + for (Entry<Integer, Collection<I>> e : + resolveVertexIndices.asMap().entrySet()) { Partition<I, V, E, M> partition = - service.getPartition(vertexIndex); - if (partition == null) { - throw new IllegalStateException( - "prepareSuperstep: No partition for index " + vertexIndex + - " in " + service.getPartitionStore() + " should have been " + - service.getVertexPartitionOwner(vertexIndex)); - } - if (vertex != null) { - partition.putVertex(vertex); - } else if (originalVertex != null) { - partition.removeVertex(originalVertex.getId()); + service.getPartitionStore().getPartition(e.getKey()); + for (I vertexIndex : e.getValue()) { + Vertex<I, V, E, M> originalVertex = + partition.getVertex(vertexIndex); + + VertexMutations<I, V, E, M> mutations = null; + VertexMutations<I, V, E, M> vertexMutations = + serverData.getVertexMutations().get(vertexIndex); + if (vertexMutations != null) { + synchronized (vertexMutations) { + mutations = vertexMutations.copy(); + } + serverData.getVertexMutations().remove(vertexIndex); + } + Vertex<I, V, E, M> vertex = vertexResolver.resolve( + vertexIndex, originalVertex, mutations, + serverData.getCurrentMessageStore(). + hasMessagesForVertex(vertexIndex)); + graphState.getContext().progress(); + + if (LOG.isDebugEnabled()) { + LOG.debug("resolveMutations: Resolved vertex index " + + vertexIndex + " with original vertex " + + originalVertex + ", returned vertex " + vertex + + " on superstep " + service.getSuperstep() + + " with mutations " + + mutations); + } + if (vertex != null) { + partition.putVertex(vertex); + } else if (originalVertex != null) { + partition.removeVertex(originalVertex.getId()); + } } } - if (!serverData.getVertexMutations().isEmpty()) { - throw new IllegalStateException("prepareSuperstep: Illegally " + + throw new IllegalStateException("resolveMutations: Illegally " + "still has " + serverData.getVertexMutations().size() + " mutations left."); }
