Author: edwardyoon
Date: Tue Jan 21 01:28:58 2014
New Revision: 1559885
URL: http://svn.apache.org/r1559885
Log:
HAMA-853: Refactor Outgoing message manager (edwardyoon)
Added:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
(with props)
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
(with props)
Removed:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueueTransfer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueueTransfer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferProtocol.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueueTransfer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueueTransfer.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/c++/src/main/native/examples/conf/matrixmultiplication.xml
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Tue Jan 21 01:28:58 2014
@@ -23,6 +23,7 @@ Release 0.7.0 (unreleased changes)
IMPROVEMENTS
+ HAMA-853: Refactor Outgoing message manager (edwardyoon)
HAMA-852: Add MessageClass property in BSPJob (Martin Illecker)
HAMA-843: Message communication overhead between master aggregation and
vertex computation supersteps (edwardyoon)
HAMA-838: Refactor aggregators (Anastasis Andronidis)
Modified: hama/trunk/c++/src/main/native/examples/conf/matrixmultiplication.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/conf/matrixmultiplication.xml?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/conf/matrixmultiplication.xml
(original)
+++ hama/trunk/c++/src/main/native/examples/conf/matrixmultiplication.xml Tue
Jan 21 01:28:58 2014
@@ -58,8 +58,8 @@
<value>org.apache.hama.commons.io.PipesKeyValueWritable</value>
</property>
<property>
- <name>hama.messenger.xfer.queue.class</name>
- <value>org.apache.hama.bsp.message.queue.SortedMemoryQueueTransfer</value>
+ <name>hama.messenger.receive.queue.class</name>
+ <value>org.apache.hama.bsp.message.queue.SortedMemoryQueue</value>
</property>
<property>
<name>bsp.input.partitioner.class</name>
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Tue Jan
21 01:28:58 2014
@@ -42,7 +42,6 @@ import org.apache.hama.bsp.ft.BSPFaultTo
import org.apache.hama.bsp.ft.FaultTolerantPeerService;
import org.apache.hama.bsp.message.MessageManager;
import org.apache.hama.bsp.message.MessageManagerFactory;
-import org.apache.hama.bsp.message.queue.MessageQueue;
import org.apache.hama.bsp.sync.PeerSyncClient;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.bsp.sync.SyncServiceFactory;
@@ -90,7 +89,6 @@ public final class BSPPeerImpl<K1, V1, K
private InetSocketAddress peerAddress;
private Counters counters;
- private Combiner<M> combiner;
private FaultTolerantPeerService<M> faultToleranceService;
@@ -122,7 +120,8 @@ public final class BSPPeerImpl<K1, V1, K
* @param dfs is the Hadoop FileSystem.
* @param counters is the counters from outside.
*/
- public BSPPeerImpl(final HamaConfiguration conf, FileSystem dfs, Counters
counters) {
+ public BSPPeerImpl(final HamaConfiguration conf, FileSystem dfs,
+ Counters counters) {
this(conf, dfs);
this.counters = counters;
}
@@ -169,14 +168,14 @@ public final class BSPPeerImpl<K1, V1, K
// This function call may change the current peer address
initializeMessaging();
-
+
conf.set(Constants.PEER_HOST, peerAddress.getHostName());
conf.setInt(Constants.PEER_PORT, peerAddress.getPort());
if (LOG.isDebugEnabled()) {
LOG.debug("Initialized Messaging service.");
}
-
+
initializeIO();
initializeSyncService(superstep, state);
@@ -190,12 +189,6 @@ public final class BSPPeerImpl<K1, V1, K
setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 1.0f, state,
stateString, peerAddress.getHostName(), phase, counters));
- final String combinerName = conf.get(Constants.COMBINER_CLASS);
- if (combinerName != null) {
- combiner = (Combiner<M>) ReflectionUtils.newInstance(
- conf.getClassByName(combinerName), conf);
- }
-
if (conf.getBoolean(Constants.FAULT_TOLERANCE_FLAG, false)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Fault tolerance enabled.");
@@ -372,16 +365,15 @@ public final class BSPPeerImpl<K1, V1, K
InterruptedException {
// normally all messages should been send now, finalizing the send phase
- messenger.finishSendPhase();
- Iterator<Entry<InetSocketAddress, MessageQueue<M>>> it = messenger
- .getMessageIterator();
+ Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>> it = messenger
+ .getOutgoingBundles();
while (it.hasNext()) {
- Entry<InetSocketAddress, MessageQueue<M>> entry = it.next();
+ Entry<InetSocketAddress, BSPMessageBundle<M>> entry = it.next();
final InetSocketAddress addr = entry.getKey();
- final Iterable<M> messages = entry.getValue();
- final BSPMessageBundle<M> bundle = combineMessages(messages);
+ final BSPMessageBundle<M> bundle = entry.getValue();
+
// remove this message during runtime to save a bit of memory
it.remove();
try {
@@ -389,10 +381,6 @@ public final class BSPPeerImpl<K1, V1, K
} catch (Exception e) {
LOG.error("Error while sending messages", e);
}
- MessageQueue<M> msgQueue = (MessageQueue<M>) messages;
- if (msgQueue != null) {
- msgQueue.close();
- }
}
if (this.faultToleranceService != null) {
@@ -415,7 +403,7 @@ public final class BSPPeerImpl<K1, V1, K
}
// Clear outgoing queues.
- messenger.clearOutgoingQueues();
+ messenger.clearOutgoingMessages();
leaveBarrier();
@@ -437,18 +425,6 @@ public final class BSPPeerImpl<K1, V1, K
}
- private final BSPMessageBundle<M> combineMessages(Iterable<M> messages) {
- BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
- if (combiner != null) {
- bundle.addMessage(combiner.combine(messages));
- } else {
- for (M message : messages) {
- bundle.addMessage(message);
- }
- }
- return bundle;
- }
-
protected final void enterBarrier() throws SyncException {
syncClient.enterBarrier(taskId.getJobID(), taskId,
currentTaskStatus.getSuperstepCount());
@@ -520,7 +496,7 @@ public final class BSPPeerImpl<K1, V1, K
@Override
public final void clear() {
- messenger.clearOutgoingQueues();
+ messenger.clearOutgoingMessages();
}
/**
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Tue Jan 21 01:28:58 2014
@@ -19,8 +19,6 @@ package org.apache.hama.bsp.message;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map.Entry;
@@ -32,16 +30,18 @@ import org.apache.hadoop.conf.Configurab
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
+import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.message.queue.DiskQueue;
import org.apache.hama.bsp.message.queue.MemoryQueue;
import org.apache.hama.bsp.message.queue.MessageQueue;
import org.apache.hama.bsp.message.queue.SingleLockQueue;
import org.apache.hama.bsp.message.queue.SynchronizedQueue;
-import org.apache.hama.util.BSPNetUtils;
+import org.apache.hama.util.ReflectionUtils;
/**
* Abstract baseclass that should contain all information and services needed
@@ -56,9 +56,8 @@ public abstract class AbstractMessageMan
// conf is injected via reflection of the factory
protected Configuration conf;
- protected final HashMap<String, InetSocketAddress> peerSocketCache = new
HashMap<String, InetSocketAddress>();
- protected final HashMap<InetSocketAddress, MessageQueue<M>> outgoingQueues =
new HashMap<InetSocketAddress, MessageQueue<M>>();
+ protected OutgoingMessageManager<M> outgoingMessageManager;
protected MessageQueue<M> localQueue;
// this must be a synchronized implementation: this is accessed per RPC
@@ -81,6 +80,7 @@ public abstract class AbstractMessageMan
* TaskAttemptID, org.apache.hama.bsp.BSPPeer,
* org.apache.hadoop.conf.Configuration, java.net.InetSocketAddress)
*/
+ @SuppressWarnings("unchecked")
@Override
public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
Configuration conf, InetSocketAddress peerAddress) {
@@ -91,6 +91,19 @@ public abstract class AbstractMessageMan
this.localQueue = getReceiverQueue();
this.localQueueForNextIteration = getSynchronizedReceiverQueue();
this.maxCachedConnections = conf.getInt(MAX_CACHED_CONNECTIONS_KEY, 100);
+ this.outgoingMessageManager = getOutgoingMessageManager();
+
+ final String combinerName = conf.get(Constants.COMBINER_CLASS);
+ if (combinerName != null) {
+ try {
+ Combiner<M> combiner = (Combiner<M>) ReflectionUtils.newInstance(conf
+ .getClassByName(combinerName));
+ this.outgoingMessageManager.setCombiner(combiner);
+ } catch (ClassNotFoundException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
}
/*
@@ -100,10 +113,7 @@ public abstract class AbstractMessageMan
@Override
public void close() {
try {
- Collection<MessageQueue<M>> values = outgoingQueues.values();
- for (MessageQueue<M> msgQueue : values) {
- msgQueue.close();
- }
+ outgoingMessageManager.clear();
localQueue.close();
// remove possible disk queues from the path
try {
@@ -121,18 +131,6 @@ public abstract class AbstractMessageMan
/*
* (non-Javadoc)
- * @see org.apache.hama.bsp.message.MessageManager#finishSendPhase()
- */
- @Override
- public void finishSendPhase() throws IOException {
- Collection<MessageQueue<M>> values = outgoingQueues.values();
- for (MessageQueue<M> msgQueue : values) {
- msgQueue.prepareRead();
- }
- }
-
- /*
- * (non-Javadoc)
* @see org.apache.hama.bsp.message.MessageManager#getCurrentMessage()
*/
@Override
@@ -154,7 +152,7 @@ public abstract class AbstractMessageMan
* @see org.apache.hama.bsp.message.MessageManager#clearOutgoingQueues()
*/
@Override
- public final void clearOutgoingQueues() {
+ public final void clearOutgoingMessages() {
if (conf.getBoolean(MessageQueue.PERSISTENT_QUEUE, false)
&& localQueue.size() > 0) {
@@ -199,21 +197,8 @@ public abstract class AbstractMessageMan
*/
@Override
public void send(String peerName, M msg) throws IOException {
- InetSocketAddress targetPeerAddress = null;
- // Get socket for target peer.
- if (peerSocketCache.containsKey(peerName)) {
- targetPeerAddress = peerSocketCache.get(peerName);
- } else {
- targetPeerAddress = BSPNetUtils.getAddress(peerName);
- peerSocketCache.put(peerName, targetPeerAddress);
- }
- MessageQueue<M> queue = outgoingQueues.get(targetPeerAddress);
- if (queue == null) {
- queue = getSenderQueue();
- }
- queue.add(msg);
+ outgoingMessageManager.addMessage(peerName, msg);
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
- outgoingQueues.put(targetPeerAddress, queue);
notifySentMessage(peerName, msg);
}
@@ -222,24 +207,16 @@ public abstract class AbstractMessageMan
* @see org.apache.hama.bsp.message.MessageManager#getMessageIterator()
*/
@Override
- public final Iterator<Entry<InetSocketAddress, MessageQueue<M>>>
getMessageIterator() {
- return this.outgoingQueues.entrySet().iterator();
+ public final Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>>
getOutgoingBundles() {
+ return this.outgoingMessageManager.getBundleIterator();
}
- /**
- * Returns a new queue implementation based on what was configured. If
nothing
- * has been configured for "hama.messenger.queue.class" then the
- * {@link MemoryQueue} is used. If you have scalability issues, then better
- * use {@link DiskQueue}.
- *
- * @return a <b>new</b> queue implementation.
- */
- protected MessageQueue<M> getSenderQueue() {
+ protected OutgoingMessageManager<M> getOutgoingMessageManager() {
@SuppressWarnings("unchecked")
- MessageQueue<M> queue = MessageTransferQueueFactory
- .getMessageTransferQueue(conf).getSenderQueue(conf);
- queue.init(conf, attemptId);
- return queue;
+ OutgoingMessageManager<M> messageManager = ReflectionUtils.newInstance(conf
+ .getClass(MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS,
+ OutgoingPOJOMessageBundle.class, OutgoingMessageManager.class));
+ return messageManager;
}
/**
@@ -252,16 +229,13 @@ public abstract class AbstractMessageMan
*/
protected MessageQueue<M> getReceiverQueue() {
@SuppressWarnings("unchecked")
- MessageQueue<M> queue = MessageTransferQueueFactory
- .getMessageTransferQueue(conf).getReceiverQueue(conf);
+ MessageQueue<M> queue = ReflectionUtils.newInstance(conf.getClass(
+ MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
+ MessageQueue.class));
queue.init(conf, attemptId);
return queue;
}
- protected SynchronizedQueue<M> getSynchronizedSenderQueue() {
- return SingleLockQueue.synchronize(getSenderQueue());
- }
-
protected SynchronizedQueue<M> getSynchronizedReceiverQueue() {
return SingleLockQueue.synchronize(getReceiverQueue());
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
Tue Jan 21 01:28:58 2014
@@ -27,7 +27,6 @@ import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.TaskAttemptID;
-import org.apache.hama.bsp.message.queue.MessageQueue;
/**
* This manager takes care of the messaging. It is responsible to launch a
@@ -36,9 +35,8 @@ import org.apache.hama.bsp.message.queue
*/
public interface MessageManager<M extends Writable> {
+ public static final String OUTGOING_MESSAGE_MANAGER_CLASS =
"hama.messenger.outgoing.message.manager.class";
public static final String RECEIVE_QUEUE_TYPE_CLASS =
"hama.messenger.receive.queue.class";
- public static final String SENDER_QUEUE_TYPE_CLASS =
"hama.messenger.sender.queue.class";
- public static final String TRANSFER_QUEUE_TYPE_CLASS =
"hama.messenger.xfer.queue.class";
public static final String MAX_CACHED_CONNECTIONS_KEY =
"hama.messenger.max.cached.connections";
/**
@@ -75,17 +73,10 @@ public interface MessageManager<M extend
public void send(String peerName, M msg) throws IOException;
/**
- * Should be called when all messages were send with send().
- *
- * @throws IOException
- */
- public void finishSendPhase() throws IOException;
-
- /**
- * Returns an iterator of messages grouped by peer.
+ * Returns an bundle of messages grouped by peer.
*
*/
- public Iterator<Entry<InetSocketAddress, MessageQueue<M>>>
getMessageIterator();
+ public Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>>
getOutgoingBundles();
/**
* This is the real transferring to a host with a bundle.
@@ -97,7 +88,7 @@ public interface MessageManager<M extend
/**
* Clears the outgoing queue. Can be used to switch queues.
*/
- public void clearOutgoingQueues();
+ public void clearOutgoingMessages();
/**
* Gets the number of messages in the current queue.
Added:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java?rev=1559885&view=auto
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
(added)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
Tue Jan 21 01:28:58 2014
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.Combiner;
+
+public interface OutgoingMessageManager<M extends Writable> {
+
+ public void setCombiner(Combiner<M> combiner);
+
+ public void addMessage(String peerName, M msg);
+
+ public void clear();
+
+ public Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>>
getBundleIterator();
+
+}
Propchange:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java?rev=1559885&view=auto
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
(added)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
Tue Jan 21 01:28:58 2014
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.Combiner;
+import org.apache.hama.util.BSPNetUtils;
+
+public class OutgoingPOJOMessageBundle<M extends Writable> implements
+ OutgoingMessageManager<M> {
+
+ private Combiner<M> combiner;
+ private final HashMap<String, InetSocketAddress> peerSocketCache = new
HashMap<String, InetSocketAddress>();
+ private HashMap<InetSocketAddress, BSPMessageBundle<M>> outgoingBundles =
new HashMap<InetSocketAddress, BSPMessageBundle<M>>();
+
+ @Override
+ public void setCombiner(Combiner<M> combiner) {
+ this.combiner = combiner;
+ }
+
+ @Override
+ public void addMessage(String peerName, M msg) {
+ InetSocketAddress targetPeerAddress = null;
+ // Get socket for target peer.
+ if (peerSocketCache.containsKey(peerName)) {
+ targetPeerAddress = peerSocketCache.get(peerName);
+ } else {
+ targetPeerAddress = BSPNetUtils.getAddress(peerName);
+ peerSocketCache.put(peerName, targetPeerAddress);
+ }
+
+ if (!outgoingBundles.containsKey(targetPeerAddress)) {
+ outgoingBundles.put(targetPeerAddress, new BSPMessageBundle<M>());
+ }
+
+ if (combiner != null) {
+ BSPMessageBundle<M> bundle = outgoingBundles.get(targetPeerAddress);
+ bundle.addMessage(msg);
+ BSPMessageBundle<M> combined = new BSPMessageBundle<M>();
+ combined.addMessage(combiner.combine(bundle.getMessages()));
+ outgoingBundles.put(targetPeerAddress, combined);
+ } else {
+ outgoingBundles.get(targetPeerAddress).addMessage(msg);
+ }
+ }
+
+ @Override
+ public void clear() {
+ outgoingBundles.clear();
+ }
+
+ @Override
+ public Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>>
getBundleIterator() {
+ return outgoingBundles.entrySet().iterator();
+ }
+
+}
Propchange:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
Tue Jan 21 01:28:58 2014
@@ -90,7 +90,7 @@ public final class MemoryQueue<M extends
@Override
public void close() {
-
+ this.clear();
}
@Override
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
Tue Jan 21 01:28:58 2014
@@ -94,7 +94,7 @@ public final class SortedMemoryQueue<M e
@Override
public void close() {
-
+ this.clear();;
}
@Override
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
Tue Jan 21 01:28:58 2014
@@ -45,8 +45,7 @@ import org.apache.hama.bsp.message.io.Sp
*
* @param <M>
*/
-public class SpillingQueue<M extends Writable> extends ByteArrayMessageQueue<M>
- implements MessageTransferProtocol<M> {
+public class SpillingQueue<M extends Writable> extends
ByteArrayMessageQueue<M> {
private static final Log LOG = LogFactory.getLog(SpillingQueue.class);
@@ -345,16 +344,6 @@ public class SpillingQueue<M extends Wri
}
@Override
- public MessageQueue<M> getSenderQueue(Configuration conf) {
- return this;
- }
-
- @Override
- public MessageQueue<M> getReceiverQueue(Configuration conf) {
- return this;
- }
-
- @Override
public void add(BSPMessageBundle<M> bundle) {
try {
this.spillOutputBuffer.write(((HeapByteArrayBSPMessageBundle<M>) bundle)
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Tue
Jan 21 01:28:58 2014
@@ -52,7 +52,6 @@ import org.apache.hama.bsp.ft.AsyncRcvdM
import org.apache.hama.bsp.ft.FaultTolerantPeerService;
import org.apache.hama.bsp.message.MessageEventListener;
import org.apache.hama.bsp.message.MessageManager;
-import org.apache.hama.bsp.message.queue.MessageQueue;
import org.apache.hama.bsp.sync.BSPPeerSyncClient;
import org.apache.hama.bsp.sync.PeerSyncClient;
import org.apache.hama.bsp.sync.SyncEvent;
@@ -102,11 +101,7 @@ public class TestCheckpoint extends Test
}
@Override
- public void finishSendPhase() throws IOException {
- }
-
- @Override
- public Iterator<Entry<InetSocketAddress, MessageQueue<Text>>>
getMessageIterator() {
+ public Iterator<Entry<InetSocketAddress, BSPMessageBundle<Text>>>
getOutgoingBundles() {
return null;
}
@@ -118,7 +113,7 @@ public class TestCheckpoint extends Test
}
@Override
- public void clearOutgoingQueues() {
+ public void clearOutgoingMessages() {
}
@Override
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java Tue
Jan 21 01:28:58 2014
@@ -39,32 +39,32 @@ public class TestPersistQueue extends Te
public void testDiskQueue() throws Exception {
BSPJob bsp = getNewJobConf();
- bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
- "org.apache.hama.bsp.message.queue.DiskQueueTransfer");
+ bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
+ "org.apache.hama.bsp.message.queue.DiskQueue");
assertTrue(bsp.waitForCompletion(true));
}
public void testMemoryQueue() throws Exception {
BSPJob bsp = getNewJobConf();
- bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
- "org.apache.hama.bsp.message.queue.MemoryQueueTransfer");
+ bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
+ "org.apache.hama.bsp.message.queue.MemoryQueue");
assertTrue(bsp.waitForCompletion(true));
}
public void testSortedQueue() throws Exception {
BSPJob bsp = getNewJobConf();
- bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
- "org.apache.hama.bsp.message.queue.SortedMemoryQueueTransfer");
+ bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
+ "org.apache.hama.bsp.message.queue.SortedMemoryQueue");
assertTrue(bsp.waitForCompletion(true));
}
public void testSpillingQueue() throws Exception {
BSPJob bsp = getNewJobConf();
- bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
- "org.apache.hama.bsp.message.queue.SpillingQueueTransfer");
+ bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
+ "org.apache.hama.bsp.message.queue.SpillingQueue");
assertTrue(bsp.waitForCompletion(true));
}
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
---
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
(original)
+++
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
Tue Jan 21 01:28:58 2014
@@ -34,10 +34,8 @@ import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.Counters;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.message.queue.DiskQueue;
-import org.apache.hama.bsp.message.queue.DiskQueueTransfer;
-import org.apache.hama.bsp.message.queue.MemoryQueueTransfer;
+import org.apache.hama.bsp.message.queue.MemoryQueue;
import org.apache.hama.bsp.message.queue.MessageQueue;
-import org.apache.hama.bsp.message.queue.MessageTransferProtocol;
import org.apache.hama.util.BSPNetUtils;
public class TestHamaMessageManager extends TestCase {
@@ -49,8 +47,8 @@ public class TestHamaMessageManager exte
public void testMemoryMessaging() throws Exception {
HamaConfiguration conf = new HamaConfiguration();
- conf.setClass(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
- MemoryQueueTransfer.class, MessageTransferProtocol.class);
+ conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
+ MessageQueue.class);
conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
messagingInternal(conf);
}
@@ -58,12 +56,13 @@ public class TestHamaMessageManager exte
public void testDiskMessaging() throws Exception {
HamaConfiguration conf = new HamaConfiguration();
conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
- conf.setClass(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
- DiskQueueTransfer.class, MessageTransferProtocol.class);
+ conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, DiskQueue.class,
+ MessageQueue.class);
messagingInternal(conf);
}
- private static void messagingInternal(HamaConfiguration conf) throws
Exception {
+ private static void messagingInternal(HamaConfiguration conf)
+ throws Exception {
conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS,
"org.apache.hama.bsp.message.HamaMessageManagerImpl");
MessageManager<IntWritable> messageManager = MessageManagerFactory
@@ -86,24 +85,24 @@ public class TestHamaMessageManager exte
System.out.println("Peer is " + peerName);
messageManager.send(peerName, new IntWritable(1337));
- Iterator<Entry<InetSocketAddress, MessageQueue<IntWritable>>>
messageIterator = messageManager
- .getMessageIterator();
+ Iterator<Entry<InetSocketAddress, BSPMessageBundle<IntWritable>>>
messageIterator = messageManager
+ .getOutgoingBundles();
- Entry<InetSocketAddress, MessageQueue<IntWritable>> entry = messageIterator
+ Entry<InetSocketAddress, BSPMessageBundle<IntWritable>> entry =
messageIterator
.next();
assertEquals(entry.getKey(), peer);
- assertTrue(entry.getValue().size() == 1);
+ assertTrue(entry.getValue().getMessages().size() == 1);
BSPMessageBundle<IntWritable> bundle = new BSPMessageBundle<IntWritable>();
- for (IntWritable msg : entry.getValue()) {
+ for (IntWritable msg : entry.getValue().getMessages()) {
bundle.addMessage(msg);
}
messageManager.transfer(peer, bundle);
- messageManager.clearOutgoingQueues();
+ messageManager.clearOutgoingMessages();
assertTrue(messageManager.getNumCurrentMessages() == 1);
IntWritable currentMessage = messageManager.getCurrentMessage();
Modified: hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java Tue Jan
21 01:28:58 2014
@@ -253,8 +253,8 @@ public class TestPipes extends HamaClust
bsp.setPartitioner(PipesPartitioner.class);
// sort sent messages
- bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
- "org.apache.hama.bsp.message.queue.SortedMemoryQueueTransfer");
+ bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
+ "org.apache.hama.bsp.message.queue.SortedMemoryQueue");
bsp.set("hama.mat.mult.B.path", transposedMatrixB.toString());
return bsp;
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Tue Jan
21 01:28:58 2014
@@ -32,8 +32,7 @@ import org.apache.hama.bsp.Partitioner;
import org.apache.hama.bsp.PartitioningRunner.RecordConverter;
import org.apache.hama.bsp.message.MessageManager;
import org.apache.hama.bsp.message.queue.MessageQueue;
-import org.apache.hama.bsp.message.queue.MessageTransferProtocol;
-import org.apache.hama.bsp.message.queue.SortedMemoryQueueTransfer;
+import org.apache.hama.bsp.message.queue.SortedMemoryQueue;
import com.google.common.base.Preconditions;
@@ -104,15 +103,15 @@ public class GraphJob extends BSPJob {
}
/**
- * Custom aggregator registration. Add a custom aggregator
- * that will aggregate massages sent from the user.
- *
- * @param name identifies an aggregator
- * @param aggregatorClass the aggregator class
- */
+ * Custom aggregator registration. Add a custom aggregator that will
aggregate
+ * massages sent from the user.
+ *
+ * @param name identifies an aggregator
+ * @param aggregatorClass the aggregator class
+ */
@SuppressWarnings("rawtypes")
- public void registerAggregator(String name, Class<? extends
- Aggregator> aggregatorClass) {
+ public void registerAggregator(String name,
+ Class<? extends Aggregator> aggregatorClass) {
String prevAggrs = this.conf.get(AGGREGATOR_CLASS_ATTR, "");
prevAggrs += name + "@" + aggregatorClass.getName() + ";";
@@ -197,8 +196,8 @@ public class GraphJob extends BSPJob {
}
// add the default message queue to the sorted one
- this.getConfiguration().setClass(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
- SortedMemoryQueueTransfer.class, MessageTransferProtocol.class);
+ this.getConfiguration().setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
+ SortedMemoryQueue.class, MessageQueue.class);
super.submit();
}