Author: edwardyoon
Date: Tue Apr 21 23:20:11 2015
New Revision: 1675208
URL: http://svn.apache.org/r1675208
Log:
Add rejectexecutionhandler
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1675208&r1=1675207&r2=1675208&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Tue Apr 21 23:20:11 2015
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
@@ -115,6 +116,8 @@ public final class GraphJobRunner<V exte
private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
peer;
+ private RejectedExecutionHandler retryHandler = new
RetryRejectedExecutionHandler();
+
@Override
public final void setup(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
@@ -252,6 +255,7 @@ public final class GraphJobRunner<V exte
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
.newCachedThreadPool();
executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 1024));
+ executor.setRejectedExecutionHandler(retryHandler);
long loopStartTime = System.currentTimeMillis();
while (currentMessage != null) {
@@ -302,7 +306,8 @@ public final class GraphJobRunner<V exte
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
.newCachedThreadPool();
executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 1024));
-
+ executor.setRejectedExecutionHandler(retryHandler);
+
for (Vertex<V, E, M> v : vertices.getValues()) {
Runnable worker = new ComputeRunnable(v);
executor.execute(worker);
@@ -422,7 +427,8 @@ public final class GraphJobRunner<V exte
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
.newCachedThreadPool();
executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 1024));
-
+ executor.setRejectedExecutionHandler(retryHandler);
+
try {
KeyValuePair<Writable, Writable> next = null;
while ((next = peer.readNext()) != null) {
@@ -707,6 +713,20 @@ public final class GraphJobRunner<V exte
};
}
+ class RetryRejectedExecutionHandler implements RejectedExecutionHandler {
+
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ executor.execute(r);
+ }
+
+ }
+
/**
* @return the destination peer name of the destination of the given directed
* edge.