Updated Branches: refs/heads/trunk 85e7a4907 -> c14e524b6
GIRAPH-556: Race condition in EdgeStore (apresta) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c14e524b Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c14e524b Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c14e524b Branch: refs/heads/trunk Commit: c14e524b6ac8829fae37eeedcbc78eaa5b9bedcc Parents: 85e7a49 Author: Alessandro Presta <[email protected]> Authored: Thu Mar 7 22:51:35 2013 -0800 Committer: Alessandro Presta <[email protected]> Committed: Fri Mar 8 10:14:14 2013 -0800 ---------------------------------------------------------------------- CHANGELOG | 2 + .../giraph/comm/WorkerClientRequestProcessor.java | 8 ----- .../netty/NettyWorkerClientRequestProcessor.java | 5 --- .../conf/ImmutableClassesGiraphConfiguration.java | 12 +++++++ .../org/apache/giraph/edge/ArrayListEdges.java | 16 ++++----- .../org/apache/giraph/edge/ByteArrayEdges.java | 16 ++++----- .../java/org/apache/giraph/edge/EdgeStore.java | 6 +-- .../java/org/apache/giraph/edge/HashMapEdges.java | 20 +++++------- .../apache/giraph/edge/LongDoubleArrayEdges.java | 24 +++++++-------- .../apache/giraph/edge/LongDoubleHashMapEdges.java | 22 ++++++------- .../org/apache/giraph/edge/LongNullArrayEdges.java | 24 +++++++-------- .../apache/giraph/edge/LongNullHashSetEdges.java | 22 ++++++------- .../main/java/org/apache/giraph/graph/Vertex.java | 2 +- 13 files changed, 82 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/c14e524b/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 7f5a113..3fdfb63 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-556: Race condition in EdgeStore (apresta) + GIRAPH-528: Decouple vertex implementation from edge storage (apresta) GIRAPH-553: Cleanup HCatalogVertexOutputFormat (majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/c14e524b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java index 5b82d82..bc0637f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java @@ -127,12 +127,4 @@ public interface WorkerClientRequestProcessor<I extends WritableComparable, * @return Number of messages sent before the reset. */ long resetMessageCount(); - - /** - * Lookup PartitionOwner for a vertex. - * - * @param vertexId id to look up. - * @return PartitionOwner holding the vertex. - */ - PartitionOwner getVertexPartitionOwner(I vertexId); } http://git-wip-us.apache.org/repos/asf/giraph/blob/c14e524b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java index d1e99cf..e58030e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java @@ -460,11 +460,6 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable, return messagesSentInSuperstep; } - @Override - public PartitionOwner getVertexPartitionOwner(I vertexId) { - return workerClient.getVertexPartitionOwner(vertexId); - } - /** * When doing the request, short circuit if it is local * http://git-wip-us.apache.org/repos/asf/giraph/blob/c14e524b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java index 8457b8b..7075999 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java @@ -571,6 +571,18 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, } /** + * Create a {@link VertexEdges} instance and initialize it with the default + * capacity. + * + * @return Instantiated VertexEdges + */ + public VertexEdges<I, E> createAndInitializeVertexEdges() { + VertexEdges<I, E> vertexEdges = createVertexEdges(); + vertexEdges.initialize(); + return vertexEdges; + } + + /** * Create a {@link VertexEdges} instance and initialize it with the given * capacity (the number of edges that will be added). * http://git-wip-us.apache.org/repos/asf/giraph/blob/c14e524b/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java index 68d4ec0..98b1aef 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/ArrayListEdges.java @@ -43,15 +43,13 @@ public class ArrayListEdges<I extends WritableComparable, E extends Writable> @Override public void initialize(Iterable<Edge<I, E>> edges) { - if (edges != null) { - // If the iterable is actually an instance of ArrayList, - // we simply copy the reference. - // Otherwise we have to add every edge. - if (edges instanceof ArrayList) { - edgeList = (ArrayList<Edge<I, E>>) edges; - } else { - edgeList = Lists.newArrayList(edges); - } + // If the iterable is actually an instance of ArrayList, + // we simply copy the reference. + // Otherwise we have to add every edge. + if (edges instanceof ArrayList) { + edgeList = (ArrayList<Edge<I, E>>) edges; + } else { + edgeList = Lists.newArrayList(edges); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/c14e524b/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java index be74ad1..6201d25 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/ByteArrayEdges.java @@ -55,16 +55,14 @@ public class ByteArrayEdges<I extends WritableComparable, E extends Writable> public void initialize(Iterable<Edge<I, E>> edges) { ExtendedDataOutput extendedOutputStream = getConf().createExtendedDataOutput(); - if (edges != null) { - for (Edge<I, E> edge : edges) { - try { - WritableUtils.writeEdge(extendedOutputStream, edge); - } catch (IOException e) { - throw new IllegalStateException("initialize: Failed to serialize " + - edge); - } - ++edgeCount; + for (Edge<I, E> edge : edges) { + try { + WritableUtils.writeEdge(extendedOutputStream, edge); + } catch (IOException e) { + throw new IllegalStateException("initialize: Failed to serialize " + + edge); } + ++edgeCount; } serializedEdges = extendedOutputStream.getByteArray(); serializedEdgesBytesUsed = extendedOutputStream.getPos(); http://git-wip-us.apache.org/repos/asf/giraph/blob/c14e524b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java index 64569bb..1f6e9bb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java @@ -110,13 +110,11 @@ public class EdgeStore<I extends WritableComparable, vertexIdEdgeIterator.releaseCurrentEdge(); VertexEdges<I, E> vertexEdges = partitionEdges.get(vertexId); if (vertexEdges == null) { - VertexEdges<I, E> newVertexEdges = configuration.createVertexEdges(); + VertexEdges<I, E> newVertexEdges = + configuration.createAndInitializeVertexEdges(); vertexEdges = partitionEdges.putIfAbsent(vertexId, newVertexEdges); if (vertexEdges == null) { vertexEdges = newVertexEdges; - // Only initialize the new vertex once we are sure it's going to be - // used. - vertexEdges.initialize(); // Since we had to use the vertex id as a new key in the map, // we need to release the object. vertexIdEdgeIterator.releaseCurrentVertexId(); http://git-wip-us.apache.org/repos/asf/giraph/blob/c14e524b/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java index 1aa9a46..2600992 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/HashMapEdges.java @@ -48,17 +48,15 @@ public class HashMapEdges<I extends WritableComparable, E extends Writable> @Override public void initialize(Iterable<Edge<I, E>> edges) { - if (edges != null) { - // If the iterable is actually a collection, we can cheaply get the - // size and initialize the hash-map with the expected capacity. - if (edges instanceof Collection) { - initialize(((Collection<Edge<I, E>>) edges).size()); - } else { - initialize(); - } - for (Edge<I, E> edge : edges) { - add(edge); - } + // If the iterable is actually a collection, we can cheaply get the + // size and initialize the hash-map with the expected capacity. + if (edges instanceof Collection) { + initialize(((Collection<Edge<I, E>>) edges).size()); + } else { + initialize(); + } + for (Edge<I, E> edge : edges) { + add(edge); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/c14e524b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java index 9df58a9..f164484 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleArrayEdges.java @@ -49,19 +49,17 @@ public class LongDoubleArrayEdges @Override public void initialize(Iterable<Edge<LongWritable, DoubleWritable>> edges) { - if (edges != null) { - // If the iterable is actually a collection, we can cheaply get the - // size and initialize the arrays with the expected capacity. - if (edges instanceof Collection) { - int numEdges = - ((Collection<Edge<LongWritable, DoubleWritable>>) edges).size(); - initialize(numEdges); - } else { - initialize(); - } - for (Edge<LongWritable, DoubleWritable> edge : edges) { - add(edge); - } + // If the iterable is actually a collection, we can cheaply get the + // size and initialize the arrays with the expected capacity. + if (edges instanceof Collection) { + int numEdges = + ((Collection<Edge<LongWritable, DoubleWritable>>) edges).size(); + initialize(numEdges); + } else { + initialize(); + } + for (Edge<LongWritable, DoubleWritable> edge : edges) { + add(edge); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/c14e524b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java index 6d17b4b..68bd85f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongDoubleHashMapEdges.java @@ -50,18 +50,16 @@ public class LongDoubleHashMapEdges @Override public void initialize(Iterable<Edge<LongWritable, DoubleWritable>> edges) { - if (edges != null) { - // If the iterable is actually a collection, we can cheaply get the - // size and initialize the hash-map with the expected capacity. - if (edges instanceof Collection) { - initialize( - ((Collection<Edge<LongWritable, DoubleWritable>>) edges).size()); - } else { - initialize(); - } - for (Edge<LongWritable, DoubleWritable> edge : edges) { - add(edge); - } + // If the iterable is actually a collection, we can cheaply get the + // size and initialize the hash-map with the expected capacity. + if (edges instanceof Collection) { + initialize( + ((Collection<Edge<LongWritable, DoubleWritable>>) edges).size()); + } else { + initialize(); + } + for (Edge<LongWritable, DoubleWritable> edge : edges) { + add(edge); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/c14e524b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java index a3b869a..528acb2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullArrayEdges.java @@ -45,19 +45,17 @@ public class LongNullArrayEdges @Override public void initialize(Iterable<Edge<LongWritable, NullWritable>> edges) { - if (edges != null) { - // If the iterable is actually a collection, we can cheaply get the - // size and initialize the arrays with the expected capacity. - if (edges instanceof Collection) { - int numEdges = - ((Collection<Edge<LongWritable, NullWritable>>) edges).size(); - initialize(numEdges); - } else { - initialize(); - } - for (Edge<LongWritable, NullWritable> edge : edges) { - add(edge); - } + // If the iterable is actually a collection, we can cheaply get the + // size and initialize the arrays with the expected capacity. + if (edges instanceof Collection) { + int numEdges = + ((Collection<Edge<LongWritable, NullWritable>>) edges).size(); + initialize(numEdges); + } else { + initialize(); + } + for (Edge<LongWritable, NullWritable> edge : edges) { + add(edge); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/c14e524b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java index 70e69c4..26c57ae 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongNullHashSetEdges.java @@ -47,18 +47,16 @@ public class LongNullHashSetEdges @Override public void initialize(Iterable<Edge<LongWritable, NullWritable>> edges) { - if (edges != null) { - // If the iterable is actually a collection, we can cheaply get the - // size and initialize the hash-map with the expected capacity. - if (edges instanceof Collection) { - initialize( - ((Collection<Edge<LongWritable, NullWritable>>) edges).size()); - } else { - initialize(); - } - for (Edge<LongWritable, NullWritable> edge : edges) { - add(edge); - } + // If the iterable is actually a collection, we can cheaply get the + // size and initialize the hash-map with the expected capacity. + if (edges instanceof Collection) { + initialize( + ((Collection<Edge<LongWritable, NullWritable>>) edges).size()); + } else { + initialize(); + } + for (Edge<LongWritable, NullWritable> edge : edges) { + add(edge); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/c14e524b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java index 6fe9041..66f081a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java @@ -357,7 +357,7 @@ public abstract class Vertex<I extends WritableComparable, * @param value Vertex value */ public void addVertexRequest(I id, V value) throws IOException { - addVertexRequest(id, value, conf.createVertexEdges()); + addVertexRequest(id, value, conf.createAndInitializeVertexEdges()); } /**
