Author: edwardyoon
Date: Wed Apr 22 13:32:55 2015
New Revision: 1675359
URL: http://svn.apache.org/r1675359
Log:
Fix race conditions about ConcurrentHashMap
Modified:
hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
Modified:
hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1675359&r1=1675358&r2=1675359&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
(original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
Wed Apr 22 13:32:55 2015
@@ -56,14 +56,14 @@ public class PageRank {
if (val != null) {
MAXIMUM_CONVERGENCE_ERROR = Double.parseDouble(val);
}
+
+ // initialize this vertex to 1 / count of global vertices in this graph
+ setValue(new DoubleWritable(1.0 / getTotalNumVertices()));
}
@Override
public void compute(Iterable<DoubleWritable> messages) throws IOException {
- // initialize this vertex to 1 / count of global vertices in this graph
- if (this.getSuperstepCount() == 0) {
- setValue(new DoubleWritable(1.0 / getTotalNumVertices()));
- } else if (this.getSuperstepCount() >= 1) {
+ if (this.getSuperstepCount() >= 1) {
double sum = 0;
for (DoubleWritable msg : messages) {
sum += msg.get();
@@ -110,14 +110,13 @@ public class PageRank {
public static class PagerankJsonReader extends
VertexInputReader<LongWritable, Text, Text, NullWritable,
DoubleWritable> {
- JSONParser parser = new JSONParser();
+ @SuppressWarnings("unchecked")
@Override
public boolean parseVertex(LongWritable key, Text value,
Vertex<Text, NullWritable, DoubleWritable> vertex) throws Exception {
-
- String strValue = value.toString();
- JSONArray jsonArray = (JSONArray) parser.parse(strValue);
+ JSONArray jsonArray = (JSONArray) new JSONParser()
+ .parse(value.toString());
vertex.setVertexID(new Text(jsonArray.get(0).toString()));
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1675359&r1=1675358&r2=1675359&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
Wed Apr 22 13:32:55 2015
@@ -74,7 +74,8 @@ public final class GraphJobMessage imple
this.flag = VERTEX_FLAG;
this.vertexId = vertexId;
- add(vertexValue);
+ if (vertexValue != null)
+ add(vertexValue);
}
public GraphJobMessage(IntWritable size) {
@@ -299,4 +300,8 @@ public final class GraphJobMessage imple
}
}
+ public void setFlag(int partitionFlag) {
+ this.flag = partitionFlag;
+ }
+
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1675359&r1=1675358&r2=1675359&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Wed Apr 22 13:32:55 2015
@@ -23,9 +23,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@@ -319,7 +317,7 @@ public final class GraphJobRunner<V exte
} catch (InterruptedException e) {
LOG.error(e);
}
-
+
getAggregationRunner().sendAggregatorValues(peer, 1,
this.changedVertexCnt);
iteration++;
finishSuperstep();
@@ -414,6 +412,9 @@ public final class GraphJobRunner<V exte
EDGE_VALUE_CLASS = edgeValueClass;
}
+ private final ConcurrentHashMap<String, GraphJobMessage> messages = new
ConcurrentHashMap<String, GraphJobMessage>();
+ private VertexInputReader<Writable, Writable, V, E, M> reader;
+
/**
* Loads vertices into memory of each peer.
*/
@@ -421,9 +422,8 @@ public final class GraphJobRunner<V exte
private void loadVertices(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
- final Map<String, GraphJobMessage> messages = new HashMap<String,
GraphJobMessage>();
-
- VertexInputReader<Writable, Writable, V, E, M> reader =
(VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
+
+ reader = (VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
.newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
VertexInputReader.class));
@@ -435,37 +435,37 @@ public final class GraphJobRunner<V exte
try {
KeyValuePair<Writable, Writable> next = null;
while ((next = peer.readNext()) != null) {
-
Vertex<V, E, M> vertex = GraphJobRunner
.<V, E, M> newVertexInstance(VERTEX_CLASS);
boolean vertexFinished = reader.parseVertex(next.getKey(),
next.getValue(), vertex);
+
if (!vertexFinished) {
continue;
}
- String dstHost = getHostName(vertex.getVertexID());
- if (peer.getPeerName().equals(dstHost)) {
- Runnable worker = new LoadWorker(vertex);
- executor.execute(worker);
- } else {
- if (!messages.containsKey(dstHost)) {
- messages.put(dstHost, new GraphJobMessage(serialize(vertex)));
- } else {
- messages.get(dstHost).add(serialize(vertex));
- }
- }
+ Runnable worker = new LoadWorker(vertex);
+ executor.execute(worker);
+
}
} catch (Exception e) {
e.printStackTrace();
}
- for (Entry<String, GraphJobMessage> e : messages.entrySet()) {
- peer.send(e.getKey(), e.getValue());
+ executor.shutdown();
+ executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
+
+ Iterator<Entry<String, GraphJobMessage>> it;
+ it = messages.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, GraphJobMessage> e = it.next();
+ it.remove();
+ GraphJobMessage msg = e.getValue();
+ msg.setFlag(GraphJobMessage.PARTITION_FLAG);
+ peer.send(e.getKey(), msg);
}
- messages.clear();
-
+
peer.sync();
GraphJobMessage msg;
@@ -477,12 +477,9 @@ public final class GraphJobRunner<V exte
Vertex<V, E, M> vertex = newVertexInstance(VERTEX_CLASS);
vertex.readFields(dis);
- Runnable worker = new LoadWorker(vertex);
- executor.execute(worker);
+ addVertex(vertex);
}
}
- executor.shutdown();
- executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
LOG.info(vertices.size() + " vertices are loaded into "
+ peer.getPeerName());
@@ -498,7 +495,16 @@ public final class GraphJobRunner<V exte
@Override
public void run() {
try {
- addVertex(vertex);
+ String dstHost = getHostName(vertex.getVertexID());
+ if (peer.getPeerName().equals(dstHost)) {
+ addVertex(vertex);
+ } else {
+ if (!messages.containsKey(dstHost)) {
+ messages.putIfAbsent(dstHost, new GraphJobMessage());
+ }
+ messages.get(dstHost).add(serialize(vertex));
+ }
+
} catch (IOException e) {
e.printStackTrace();
}
@@ -644,12 +650,11 @@ public final class GraphJobRunner<V exte
private final ConcurrentHashMap<V, GraphJobMessage> storage = new
ConcurrentHashMap<V, GraphJobMessage>();
public void sendMessage(V vertexID, byte[] msg) throws IOException {
- if (storage.containsKey(vertexID)) {
- storage.get(vertexID).add(msg);
- } else {
+ if (!storage.containsKey(vertexID)) {
// To save bit memory we don't set vertexID twice
- storage.put(vertexID, new GraphJobMessage(null, msg));
+ storage.putIfAbsent(vertexID, new GraphJobMessage());
}
+ storage.get(vertexID).add(msg);
}
public void finishSuperstep() throws IOException {
@@ -659,21 +664,21 @@ public final class GraphJobRunner<V exte
while (it.hasNext()) {
Entry<V, GraphJobMessage> e = it.next();
it.remove();
-
+
if (combiner != null && e.getValue().getNumOfValues() > 1) {
- peer.send(
- getHostName(e.getKey()),
- new GraphJobMessage(e.getKey(), serialize(combiner
- .combine(getIterableMessages(e.getValue().getValuesBytes(), e
- .getValue().getNumOfValues())))));
+ GraphJobMessage combined = new GraphJobMessage(e.getKey(),
+ serialize(combiner.combine(getIterableMessages(e.getValue()
+ .getValuesBytes(), e.getValue().getNumOfValues()))));
+ combined.setFlag(GraphJobMessage.VERTEX_FLAG);
+ peer.send(getHostName(e.getKey()), combined);
} else {
// set vertexID
e.getValue().setVertexId(e.getKey());
+ e.getValue().setFlag(GraphJobMessage.VERTEX_FLAG);
peer.send(getHostName(e.getKey()), e.getValue());
}
}
- storage.clear();
-
+
if (isMasterTask(peer)) {
peer.getCounter(GraphJobCounter.ITERATIONS).increment(1);
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java?rev=1675359&r1=1675358&r2=1675359&view=diff
==============================================================================
---
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
(original)
+++
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
Wed Apr 22 13:32:55 2015
@@ -69,11 +69,11 @@ public class IncomingVertexMessageManage
@Override
public void add(GraphJobMessage item) {
if (item.isVertexMessage()) {
- if (storage.containsKey(item.getVertexId())) {
+ if (!storage.containsKey(item.getVertexId())) {
+ storage.putIfAbsent(item.getVertexId(), item);
+ } else {
storage.get(item.getVertexId()).addValuesBytes(item.getValuesBytes(),
item.size());
- } else {
- storage.put(item.getVertexId(), item);
}
} else {
mapMessages.add(item);
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java?rev=1675359&r1=1675358&r2=1675359&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
Wed Apr 22 13:32:55 2015
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -44,7 +43,7 @@ import com.google.common.collect.Sets;
*/
public final class MapVerticesInfo<V extends WritableComparable<V>, E extends
Writable, M extends Writable>
implements VerticesInfo<V, E, M> {
- private final Map<V, Vertex<V, E, M>> vertices = new ConcurrentHashMap<V,
Vertex<V, E, M>>();
+ private final ConcurrentHashMap<V, Vertex<V, E, M>> vertices = new
ConcurrentHashMap<V, Vertex<V, E, M>>();
private Set<V> computedVertices = new HashSet<V>();
@@ -55,11 +54,12 @@ public final class MapVerticesInfo<V ext
@Override
public void put(Vertex<V, E, M> vertex) throws IOException {
- if (vertices.containsKey(vertex.getVertexID())) {
- for (Edge<V, E> e : vertex.getEdges())
- vertices.get(vertex.getVertexID()).addEdge(e);
+ if (!vertices.containsKey(vertex.getVertexID())) {
+ vertices.putIfAbsent(vertex.getVertexID(), vertex);
} else {
- vertices.put(vertex.getVertexID(), vertex);
+ for (Edge<V, E> e : vertex.getEdges()) {
+ vertices.get(vertex.getVertexID()).addEdge(e);
+ }
}
}