Author: tjungblut
Date: Thu Aug 16 16:51:39 2012
New Revision: 1373915
URL: http://svn.apache.org/viewvc?rev=1373915&view=rev
Log:
[HAMA-593]: Improve RPC scalability (Mayank Mishra via tjungblut)
Modified:
hama/trunk/CHANGES.txt
hama/trunk/conf/hama-default.xml
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1373915&r1=1373914&r2=1373915&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Aug 16 16:51:39 2012
@@ -34,7 +34,8 @@ Release 0.5 - April 10, 2012
HAMA-595: Fix NullPointerException in Task Scheduler (surajmenon)
IMPROVEMENTS
-
+
+ HAMA-593: Improve RPC scalability (Mayank Mishra via tjungblut)
HAMA-584: Change Pagerank IO format to human-readable text for easy debug
(tjungblut via edwardyoon)
HAMA-590: Fix TestSubmitGraphJob tests (tjungblut)
HAMA-582: Task's error logs should be displayed on client-end when job is
failed (edwardyoon)
Modified: hama/trunk/conf/hama-default.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/conf/hama-default.xml?rev=1373915&r1=1373914&r2=1373915&view=diff
==============================================================================
--- hama/trunk/conf/hama-default.xml (original)
+++ hama/trunk/conf/hama-default.xml Thu Aug 16 16:51:39 2012
@@ -140,6 +140,16 @@
<value>10000</value>
<description>The default timeout period for checking groom server
health.</description>
</property>
+
+ <property>
+ <name>hama.messenger.max.cached.connections</name>
+ <value>100</value>
+ <description>This changes the maximum number of connections that are cached
+ between the peers, normally a LRU cache is used. This affects the memory
+ consumption per task and the performance. Increasing it will give you a
speed-up
+ but it trades more memory.
+ </description>
+ </property>
<!--
Beginning of properties that are directly mapped from ZooKeeper's zoo.cfg.
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1373915&r1=1373914&r2=1373915&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Thu Aug 16 16:51:39 2012
@@ -64,6 +64,9 @@ public abstract class AbstractMessageMan
// the task attempt id
protected TaskAttemptID attemptId;
+ // to maximum cached connections in the concrete message manager
+ protected int maxCachedConnections = 100;
+
// List of listeners for all the sent messages
protected Queue<MessageEventListener<M>> messageListenerQueue;
@@ -81,9 +84,9 @@ public abstract class AbstractMessageMan
this.peer = peer;
this.conf = conf;
this.peerAddress = peerAddress;
- localQueue = getQueue();
- localQueueForNextIteration = getSynchronizedQueue();
-
+ this.localQueue = getQueue();
+ this.localQueueForNextIteration = getSynchronizedQueue();
+ this.maxCachedConnections = conf.getInt(MAX_CACHED_CONNECTIONS_KEY, 100);
}
/*
@@ -252,36 +255,30 @@ public abstract class AbstractMessageMan
}
}
-
-
@Override
public void registerListener(MessageEventListener<M> listener)
throws IOException {
- if(listener != null)
+ if (listener != null)
this.messageListenerQueue.add(listener);
-
+
}
- @SuppressWarnings("unchecked")
@Override
- public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle)
throws IOException{
+ public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle)
+ throws IOException {
for (Writable message : bundle.getMessages()) {
- loopBackMessage((M)message);
+ loopBackMessage(message);
}
-
+
}
@SuppressWarnings("unchecked")
@Override
- public void loopBackMessage(Writable message) throws IOException{
- this.localQueueForNextIteration.add((M)message);
+ public void loopBackMessage(Writable message) throws IOException {
+ this.localQueueForNextIteration.add((M) message);
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
- notifyReceivedMessage((M)message);
-
+ notifyReceivedMessage((M) message);
+
}
-
-
-
-
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java?rev=1373915&r1=1373914&r2=1373915&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
Thu Aug 16 16:51:39 2012
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.Map;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.NettyServer;
@@ -38,20 +39,39 @@ import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
+import org.apache.hama.util.LRUCache;
public final class AvroMessageManagerImpl<M extends Writable> extends
CompressableMessageManager<M> implements Sender<M> {
private NettyServer server = null;
+ // also cache the senders, getting a new sender from a transceiver generates
+ // exceptions
private final HashMap<InetSocketAddress, Sender<M>> peers = new
HashMap<InetSocketAddress, Sender<M>>();
+ private LRUCache<InetSocketAddress, NettyTransceiver> peersLRUCache;
+ @SuppressWarnings("serial")
@Override
public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
Configuration conf, InetSocketAddress addr) {
super.init(attemptId, peer, conf, addr);
super.initCompression(conf);
server = new NettyServer(new SpecificResponder(Sender.class, this), addr);
+ peersLRUCache = new LRUCache<InetSocketAddress, NettyTransceiver>(
+ maxCachedConnections) {
+ @Override
+ protected final boolean removeEldestEntry(
+ Map.Entry<InetSocketAddress, NettyTransceiver> eldest) {
+ if (size() > this.capacity) {
+ NettyTransceiver client = eldest.getValue();
+ client.close();
+ peers.remove(eldest.getKey());
+ return true;
+ }
+ return false;
+ }
+ };
}
@Override
@@ -64,21 +84,32 @@ public final class AvroMessageManagerImp
this.loopBackMessages(messages);
}
- @SuppressWarnings("unchecked")
@Override
public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
throws IOException {
AvroBSPMessageBundle<M> msg = new AvroBSPMessageBundle<M>();
msg.setData(serializeMessage(bundle));
- Sender<M> sender = peers.get(addr);
+ Sender<M> sender = getSender(addr);
+ sender.transfer(msg);
+ }
- if (sender == null) {
- NettyTransceiver client = new NettyTransceiver(addr);
- sender = SpecificRequestor.getClient(Sender.class, client);
+ /**
+ * @param addr, socket address to which BSP Peer Connection will be
+ * established
+ * @return BSP Peer Connection, tried to return cached connection, else
+ * returns a new connection and caches it
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ private final Sender<M> getSender(InetSocketAddress addr) throws IOException
{
+ NettyTransceiver client = peersLRUCache.get(addr);
+ if (client == null) {
+ client = new NettyTransceiver(addr);
+ Sender<M> sender = SpecificRequestor.getClient(Sender.class, client);
+ peersLRUCache.put(addr, client);
peers.put(addr, sender);
}
-
- sender.transfer(msg);
+ return peers.get(addr);
}
@Override
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java?rev=1373915&r1=1373914&r2=1373915&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java
Thu Aug 16 16:51:39 2012
@@ -27,7 +27,8 @@ import org.apache.hama.bsp.message.compr
*
* @param <M>
*/
-public abstract class CompressableMessageManager<M extends Writable> extends
AbstractMessageManager<M> {
+public abstract class CompressableMessageManager<M extends Writable> extends
+ AbstractMessageManager<M> {
protected BSPMessageCompressor<M> compressor;
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1373915&r1=1373914&r2=1373915&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
Thu Aug 16 16:51:39 2012
@@ -19,7 +19,7 @@ package org.apache.hama.bsp.message;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.HashMap;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,6 +33,7 @@ import org.apache.hama.bsp.TaskAttemptID
import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
import org.apache.hama.util.CompressionUtil;
+import org.apache.hama.util.LRUCache;
/**
* Implementation of the {@link HadoopMessageManager}.
@@ -44,16 +45,30 @@ public final class HadoopMessageManagerI
private static final Log LOG = LogFactory
.getLog(HadoopMessageManagerImpl.class);
- private final HashMap<InetSocketAddress, HadoopMessageManager<M>> peers =
new HashMap<InetSocketAddress, HadoopMessageManager<M>>();
-
private Server server = null;
+ private LRUCache<InetSocketAddress, HadoopMessageManager<M>> peersLRUCache =
null;
+
+ @SuppressWarnings("serial")
@Override
public final void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
Configuration conf, InetSocketAddress peerAddress) {
super.init(attemptId, peer, conf, peerAddress);
super.initCompression(conf);
startRPCServer(conf, peerAddress);
+ peersLRUCache = new LRUCache<InetSocketAddress, HadoopMessageManager<M>>(
+ maxCachedConnections) {
+ @Override
+ protected final boolean removeEldestEntry(
+ Map.Entry<InetSocketAddress, HadoopMessageManager<M>> eldest) {
+ if (size() > this.capacity) {
+ HadoopMessageManager<M> proxy = eldest.getValue();
+ RPC.stopProxy(proxy);
+ return true;
+ }
+ return false;
+ }
+ };
}
private final void startRPCServer(Configuration conf,
@@ -83,7 +98,6 @@ public final class HadoopMessageManagerI
throws IOException {
HadoopMessageManager<M> bspPeerConnection =
this.getBSPPeerConnection(addr);
-
if (bspPeerConnection == null) {
throw new IllegalArgumentException("Can not find " + addr.toString()
+ " to transfer messages to!");
@@ -101,16 +115,26 @@ public final class HadoopMessageManagerI
}
}
+ /**
+ * @param addr, socket address to which BSP Peer Connection will be
+ * established
+ * @return BSP Peer Connection, tried to return cached connection, else
+ * returns a new connection and caches it
+ * @throws IOException
+ */
@SuppressWarnings("unchecked")
protected final HadoopMessageManager<M> getBSPPeerConnection(
InetSocketAddress addr) throws IOException {
- HadoopMessageManager<M> peer = peers.get(addr);
- if (peer == null) {
- peer = (HadoopMessageManager<M>) RPC.getProxy(HadoopMessageManager.class,
- HamaRPCProtocolVersion.versionID, addr, this.conf);
- this.peers.put(addr, peer);
+ HadoopMessageManager<M> bspPeerConnection;
+ if (!peersLRUCache.containsKey(addr)) {
+ bspPeerConnection = (HadoopMessageManager<M>) RPC.getProxy(
+ HadoopMessageManager.class, HamaRPCProtocolVersion.versionID, addr,
+ this.conf);
+ peersLRUCache.put(addr, bspPeerConnection);
+ } else {
+ bspPeerConnection = peersLRUCache.get(addr);
}
- return peer;
+ return bspPeerConnection;
}
@Override
@@ -135,5 +159,4 @@ public final class HadoopMessageManagerI
return versionID;
}
-
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java?rev=1373915&r1=1373914&r2=1373915&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java
Thu Aug 16 16:51:39 2012
@@ -100,7 +100,7 @@ public final class MemoryQueue<M extends
@Override
public void prepareWrite() {
-
+
}
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1373915&r1=1373914&r2=1373915&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
Thu Aug 16 16:51:39 2012
@@ -36,6 +36,7 @@ import org.apache.hama.bsp.TaskAttemptID
public interface MessageManager<M extends Writable> {
public static final String QUEUE_TYPE_CLASS = "hama.messenger.queue.class";
+ public static final String MAX_CACHED_CONNECTIONS_KEY =
"hama.messenger.max.cached.connections";
/**
* Init can be used to start servers and initialize internal state. If you
are
@@ -99,8 +100,9 @@ public interface MessageManager<M extend
/**
* Send the messages to self to receive in the next superstep.
*/
- public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle)
throws IOException;
-
+ public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle)
+ throws IOException;
+
/**
* Send the message to self to receive in the next superstep.
*/
@@ -115,6 +117,5 @@ public interface MessageManager<M extend
*/
public void registerListener(MessageEventListener<M> listener)
throws IOException;
-
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java?rev=1373915&r1=1373914&r2=1373915&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java Thu
Aug 16 16:51:39 2012
@@ -20,7 +20,7 @@ package org.apache.hama.bsp.message;
import org.apache.hadoop.io.Writable;
public interface Sender<M extends Writable> {
-
+
public static final org.apache.avro.Protocol PROTOCOL =
org.apache.avro.Protocol
.parse("{\"protocol\":\"Sender\",\"namespace\":\"de.jungblut.avro\",\"types\":[{\"type\":\"record\",\"name\":\"AvroBSPMessageBundle\",\"fields\":[{\"name\":\"data\",\"type\":\"bytes\"}]}],\"messages\":{\"transfer\":{\"request\":[{\"name\":\"messagebundle\",\"type\":\"AvroBSPMessageBundle\"}],\"response\":\"null\"}}}");