Author: edwardyoon
Date: Sun May 31 21:50:59 2015
New Revision: 1682802
URL: http://svn.apache.org/r1682802
Log:
HAMA-959: Change to atomic counter from sync counter in MapVerticesInfo
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1682802&r1=1682801&r2=1682802&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Sun May 31 21:50:59 2015
@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -108,6 +109,10 @@ public final class GraphJobRunner<V exte
private int maxIteration = -1;
private long iteration = 0;
+ // global counter for thread exceptions
+ // TODO find more graceful way to handle thread exceptions.
+ private AtomicInteger errorCount = new AtomicInteger(0);
+
private AggregationRunner<V, E, M> aggregationRunner;
private VertexOutputWriter<Writable, Writable, V, E, M> vertexOutputWriter;
private Combiner<Writable> combiner;
@@ -116,6 +121,10 @@ public final class GraphJobRunner<V exte
private RejectedExecutionHandler retryHandler = new
RetryRejectedExecutionHandler();
+ // Below maps are used for grouping messages into single GraphJobMessage,
based on vertex ID.
+ private final ConcurrentHashMap<Integer, GraphJobMessage> partitionMessages
= new ConcurrentHashMap<Integer, GraphJobMessage>();
+ private final ConcurrentHashMap<V, GraphJobMessage> vertexMessages = new
ConcurrentHashMap<V, GraphJobMessage>();
+
@Override
public final void setup(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
@@ -242,7 +251,7 @@ public final class GraphJobRunner<V exte
private void doSuperstep(GraphJobMessage currentMessage,
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
- this.errorCount = 0;
+ this.errorCount.set(0);
long startTime = System.currentTimeMillis();
this.changedVertexCnt = 0;
@@ -269,7 +278,7 @@ public final class GraphJobRunner<V exte
throw new IOException(e);
}
- if (errorCount > 0) {
+ if (errorCount.get() > 0) {
throw new IOException("there were " + errorCount
+ " exceptions during compute vertices.");
}
@@ -305,7 +314,7 @@ public final class GraphJobRunner<V exte
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
this.changedVertexCnt = 0;
- this.errorCount = 0;
+ this.errorCount.set(0);
vertices.startSuperstep();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
@@ -324,7 +333,7 @@ public final class GraphJobRunner<V exte
throw new IOException(e);
}
- if (errorCount > 0) {
+ if (errorCount.get() > 0) {
throw new IOException("there were " + errorCount
+ " exceptions during compute vertices.");
}
@@ -334,10 +343,8 @@ public final class GraphJobRunner<V exte
finishSuperstep();
}
- private int errorCount = 0;
-
- public synchronized void incrementErrorCount() {
- errorCount++;
+ public void incrementErrorCount() {
+ errorCount.incrementAndGet();
}
class ComputeRunnable implements Runnable {
@@ -430,8 +437,6 @@ public final class GraphJobRunner<V exte
EDGE_VALUE_CLASS = edgeValueClass;
}
- private final ConcurrentHashMap<Integer, GraphJobMessage> messages = new
ConcurrentHashMap<Integer, GraphJobMessage>();
-
/**
* Loads vertices into memory of each peer.
*/
@@ -441,7 +446,7 @@ public final class GraphJobRunner<V exte
throws IOException, SyncException, InterruptedException {
for (int i = 0; i < peer.getNumPeers(); i++) {
- messages.put(i, new GraphJobMessage());
+ partitionMessages.put(i, new GraphJobMessage());
}
VertexInputReader<Writable, Writable, V, E, M> reader =
(VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
@@ -479,7 +484,7 @@ public final class GraphJobRunner<V exte
executor.awaitTermination(60, TimeUnit.SECONDS);
Iterator<Entry<Integer, GraphJobMessage>> it;
- it = messages.entrySet().iterator();
+ it = partitionMessages.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, GraphJobMessage> e = it.next();
it.remove();
@@ -547,7 +552,7 @@ public final class GraphJobRunner<V exte
if (peer.getPeerIndex() == partition) {
addVertex(vertex);
} else {
- messages.get(partition).add(WritableUtils.serialize(vertex));
+
partitionMessages.get(partition).add(WritableUtils.serialize(vertex));
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -690,20 +695,18 @@ public final class GraphJobRunner<V exte
vertices.finishAdditions();
}
- private final ConcurrentHashMap<V, GraphJobMessage> storage = new
ConcurrentHashMap<V, GraphJobMessage>();
-
public void sendMessage(V vertexID, byte[] msg) throws IOException {
- if (!storage.containsKey(vertexID)) {
+ if (!vertexMessages.containsKey(vertexID)) {
// To save bit memory we don't set vertexID twice
- storage.putIfAbsent(vertexID, new GraphJobMessage());
+ vertexMessages.putIfAbsent(vertexID, new GraphJobMessage());
}
- storage.get(vertexID).add(msg);
+ vertexMessages.get(vertexID).add(msg);
}
public void finishSuperstep() throws IOException {
vertices.finishSuperstep();
- Iterator<Entry<V, GraphJobMessage>> it = storage.entrySet().iterator();
+ Iterator<Entry<V, GraphJobMessage>> it =
vertexMessages.entrySet().iterator();
while (it.hasNext()) {
Entry<V, GraphJobMessage> e = it.next();
it.remove();
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java?rev=1682802&r1=1682801&r2=1682802&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
Sun May 31 21:50:59 2015
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -44,7 +45,7 @@ public final class MapVerticesInfo<V ext
private GraphJobRunner<V, E, M> runner;
- private int activeVertices = 0;
+ private AtomicInteger activeVertices = new AtomicInteger(0);
@Override
public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
@@ -132,8 +133,8 @@ public final class MapVerticesInfo<V ext
vertices.put(vertex.getVertexID(), WritableUtils.serialize(vertex));
}
- public synchronized void incrementCount() {
- activeVertices++;
+ public void incrementCount() {
+ activeVertices.incrementAndGet();
}
@Override
@@ -150,10 +151,10 @@ public final class MapVerticesInfo<V ext
@Override
public void finishSuperstep() throws IOException {
- activeVertices = 0;
+ activeVertices.set(0);
}
public int getActiveVerticesNum() {
- return activeVertices;
+ return activeVertices.get();
}
}