Author: edwardyoon
Date: Wed Jan 22 02:14:22 2014
New Revision: 1560236
URL: http://svn.apache.org/r1560236
Log:
Minor refactor.
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1560236&r1=1560235&r2=1560236&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Wed Jan 22 02:14:22 2014
@@ -30,11 +30,9 @@ import org.apache.hadoop.conf.Configurab
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Writable;
-import org.apache.hama.Constants;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
-import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.message.queue.DiskQueue;
import org.apache.hama.bsp.message.queue.MemoryQueue;
@@ -80,7 +78,6 @@ public abstract class AbstractMessageMan
* TaskAttemptID, org.apache.hama.bsp.BSPPeer,
* org.apache.hadoop.conf.Configuration, java.net.InetSocketAddress)
*/
- @SuppressWarnings("unchecked")
@Override
public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
Configuration conf, InetSocketAddress peerAddress) {
@@ -92,18 +89,7 @@ public abstract class AbstractMessageMan
this.localQueueForNextIteration = getSynchronizedReceiverQueue();
this.maxCachedConnections = conf.getInt(MAX_CACHED_CONNECTIONS_KEY, 100);
this.outgoingMessageManager = getOutgoingMessageManager();
-
- final String combinerName = conf.get(Constants.COMBINER_CLASS);
- if (combinerName != null) {
- try {
- Combiner<M> combiner = (Combiner<M>) ReflectionUtils.newInstance(conf
- .getClassByName(combinerName));
- this.outgoingMessageManager.setCombiner(combiner);
- } catch (ClassNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
+ this.outgoingMessageManager.init(conf);
}
/*
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java?rev=1560236&r1=1560235&r2=1560236&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
Wed Jan 22 02:14:22 2014
@@ -21,18 +21,19 @@ import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map.Entry;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPMessageBundle;
-import org.apache.hama.bsp.Combiner;
public interface OutgoingMessageManager<M extends Writable> {
- public void setCombiner(Combiner<M> combiner);
-
+ public void init(Configuration conf);
+
public void addMessage(String peerName, M msg);
public void clear();
public Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>>
getBundleIterator();
+
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java?rev=1560236&r1=1560235&r2=1560236&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
Wed Jan 22 02:14:22 2014
@@ -22,10 +22,13 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.util.BSPNetUtils;
+import org.apache.hama.util.ReflectionUtils;
public class OutgoingPOJOMessageBundle<M extends Writable> implements
OutgoingMessageManager<M> {
@@ -34,13 +37,38 @@ public class OutgoingPOJOMessageBundle<M
private final HashMap<String, InetSocketAddress> peerSocketCache = new
HashMap<String, InetSocketAddress>();
private HashMap<InetSocketAddress, BSPMessageBundle<M>> outgoingBundles =
new HashMap<InetSocketAddress, BSPMessageBundle<M>>();
+ @SuppressWarnings("unchecked")
@Override
- public void setCombiner(Combiner<M> combiner) {
- this.combiner = combiner;
+ public void init(Configuration conf) {
+ final String combinerName = conf.get(Constants.COMBINER_CLASS);
+ if (combinerName != null) {
+ try {
+ Combiner<M> combiner = (Combiner<M>) ReflectionUtils.newInstance(conf
+ .getClassByName(combinerName));
+ this.combiner = combiner;
+ } catch (ClassNotFoundException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
}
@Override
public void addMessage(String peerName, M msg) {
+ InetSocketAddress targetPeerAddress = getSocketAddress(peerName);
+
+ if (combiner != null) {
+ BSPMessageBundle<M> bundle = outgoingBundles.get(targetPeerAddress);
+ bundle.addMessage(msg);
+ BSPMessageBundle<M> combined = new BSPMessageBundle<M>();
+ combined.addMessage(combiner.combine(bundle.getMessages()));
+ outgoingBundles.put(targetPeerAddress, combined);
+ } else {
+ outgoingBundles.get(targetPeerAddress).addMessage(msg);
+ }
+ }
+
+ private InetSocketAddress getSocketAddress(String peerName) {
InetSocketAddress targetPeerAddress = null;
// Get socket for target peer.
if (peerSocketCache.containsKey(peerName)) {
@@ -53,16 +81,7 @@ public class OutgoingPOJOMessageBundle<M
if (!outgoingBundles.containsKey(targetPeerAddress)) {
outgoingBundles.put(targetPeerAddress, new BSPMessageBundle<M>());
}
-
- if (combiner != null) {
- BSPMessageBundle<M> bundle = outgoingBundles.get(targetPeerAddress);
- bundle.addMessage(msg);
- BSPMessageBundle<M> combined = new BSPMessageBundle<M>();
- combined.addMessage(combiner.combine(bundle.getMessages()));
- outgoingBundles.put(targetPeerAddress, combined);
- } else {
- outgoingBundles.get(targetPeerAddress).addMessage(msg);
- }
+ return targetPeerAddress;
}
@Override