Author: edwardyoon
Date: Tue Aug 19 06:38:49 2014
New Revision: 1618812
URL: http://svn.apache.org/r1618812
Log:
revert to 1617395
Added:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/
- copied from r1617395,
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java
- copied unchanged from r1617395,
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java
- copied unchanged from r1617395,
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DefaultMessageQueue.java
- copied unchanged from r1617395,
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DefaultMessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
- copied unchanged from r1617395,
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java
- copied unchanged from r1617395,
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java
- copied unchanged from r1617395,
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java
Modified:
hama/trunk/conf/hama-env.sh
hama/trunk/conf/hama-site.xml
hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
Modified: hama/trunk/conf/hama-env.sh
URL:
http://svn.apache.org/viewvc/hama/trunk/conf/hama-env.sh?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
--- hama/trunk/conf/hama-env.sh (original)
+++ hama/trunk/conf/hama-env.sh Tue Aug 19 06:38:49 2014
@@ -22,10 +22,10 @@
# Set environment variables here.
# The java implementation to use. Required.
-# export JAVA_HOME=/usr/lib/jvm/java-7-oracle
+export JAVA_HOME=/usr/lib/jvm/java-7-oracle
# Where log files are stored. $HAMA_HOME/logs by default.
-# export HAMA_LOG_DIR=${HAMA_HOME}/logs
+export HAMA_LOG_DIR=${HAMA_HOME}/logs
# The maximum amount of heap to use, in MB. Default is 1000.
# export HAMA_HEAPSIZE=1000
Modified: hama/trunk/conf/hama-site.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/conf/hama-site.xml?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
--- hama/trunk/conf/hama-site.xml (original)
+++ hama/trunk/conf/hama-site.xml Tue Aug 19 06:38:49 2014
@@ -22,4 +22,34 @@
*/
-->
<configuration>
+
+ <property>
+ <name>bsp.master.address</name>
+ <value>edward-VirtualBox:40000</value>
+ <description>The address of the bsp master server. Either the
+ literal string "local" or a host:port for distributed mode
+ </description>
+ </property>
+
+ <property>
+ <name>fs.default.name</name>
+ <value>hdfs://edward-VirtualBox:9000/</value>
+ <description>
+ The name of the default file system. Either the literal string
+ "local" or a host:port for HDFS.
+ </description>
+ </property>
+
+ <property>
+ <name>hama.zookeeper.quorum</name>
+ <value>edward-VirtualBox</value>
+ <description>Comma separated list of servers in the ZooKeeper Quorum.
+ For example, "host1.mydomain.com,host2.mydomain.com,host3.mydomain.com".
+ By default this is set to localhost for local and pseudo-distributed modes
+ of operation. For a fully-distributed setup, this should be set to a full
+ list of ZooKeeper quorum servers. If HAMA_MANAGES_ZK is set in hama-env.sh
+ this is the list of servers which we will start/stop zookeeper on.
+ </description>
+ </property>
+
</configuration>
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Tue
Aug 19 06:38:49 2014
@@ -20,6 +20,7 @@ package org.apache.hama.bsp;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
@@ -355,9 +356,12 @@ public class LocalBSPRunner implements J
bundle.setCompressor(compressor,
conf.getLong("hama.messenger.compression.threshold", 512));
- MANAGER_MAP.get(addr).localQueueForNextIteration.add(bundle);
- peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
- bundle.size());
+ Iterator<M> it = bundle.iterator();
+ while (it.hasNext()) {
+ MANAGER_MAP.get(addr).localQueueForNextIteration.add(it.next());
+ peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
+ 1L);
+ }
}
@Override
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Tue Aug 19 06:38:49 2014
@@ -232,12 +232,7 @@ public abstract class AbstractMessageMan
}
protected SynchronizedQueue<M> getSynchronizedReceiverQueue() {
- MessageQueue<M> queue = getReceiverQueue();
- if (queue.isMemoryBasedQueue()) {
- return (SynchronizedQueue<M>) queue;
- }
-
- return new SingleLockQueue<M>(queue);
+ return SingleLockQueue.synchronize(getReceiverQueue());
}
@Override
@@ -286,9 +281,11 @@ public abstract class AbstractMessageMan
public void loopBackBundle(BSPMessageBundle<M> bundle) throws IOException {
bundle.setCompressor(compressor,
conf.getLong("hama.messenger.compression.threshold", 128));
- this.localQueueForNextIteration.add(bundle);
- peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
- bundle.size());
+
+ Iterator<? extends Writable> it = bundle.iterator();
+ while (it.hasNext()) {
+ loopBackMessage(it.next());
+ }
}
@SuppressWarnings("unchecked")
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
Tue Aug 19 06:38:49 2014
@@ -29,7 +29,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.TaskAttemptID;
/**
@@ -46,7 +45,7 @@ import org.apache.hama.bsp.TaskAttemptID
* configuration. <br/>
* <b>It is experimental to use.</b>
*/
-public final class DiskQueue<M extends Writable> implements MessageQueue<M> {
+public final class DiskQueue<M extends Writable> extends
DefaultMessageQueue<M> {
public static final String DISK_QUEUE_PATH_KEY = "bsp.disk.queue.dir";
@@ -171,14 +170,7 @@ public final class DiskQueue<M extends W
}
@Override
- public void add(BSPMessageBundle<M> bundle){
- addAll(bundle);
- }
-
- @Override
public final void addAll(Iterable<M> col) {
- // TODO Write bundle object directly
-
for (M item : col) {
add(item);
}
@@ -321,5 +313,4 @@ public final class DiskQueue<M extends W
public boolean isMemoryBasedQueue() {
return false;
}
-
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
Tue Aug 19 06:38:49 2014
@@ -22,104 +22,54 @@ import java.util.concurrent.ConcurrentLi
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
-import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.TaskAttemptID;
/**
* LinkedList backed queue structure for bookkeeping messages.
*/
public final class MemoryQueue<M extends Writable> implements
- SynchronizedQueue<M>, MessageQueue<M> {
-
- private final ConcurrentLinkedQueue<M> messages = new
ConcurrentLinkedQueue<M>();
- private final ConcurrentLinkedQueue<BSPMessageBundle<M>> bundles = new
ConcurrentLinkedQueue<BSPMessageBundle<M>>();
- private Iterator<M> bundleIterator;
-
- int bundledMessageSize = 0;
+ SynchronizedQueue<M> {
+ private final ConcurrentLinkedQueue<M> deque = new
ConcurrentLinkedQueue<M>();
private Configuration conf;
@Override
public final void addAll(Iterable<M> col) {
for (M m : col)
- messages.add(m);
+ deque.add(m);
}
@Override
public void addAll(MessageQueue<M> otherqueue) {
M poll = null;
while ((poll = otherqueue.poll()) != null) {
- messages.add(poll);
+ deque.add(poll);
}
}
@Override
public final void add(M item) {
- messages.add(item);
- }
-
- @Override
- public void add(BSPMessageBundle<M> bundle) {
- bundledMessageSize += bundle.size();
- bundles.add(bundle);
+ deque.add(item);
}
@Override
public final void clear() {
- messages.clear();
- bundles.clear();
- bundleIterator = null;
+ deque.clear();
}
@Override
public final M poll() {
- if (messages.size() > 0) {
- return messages.poll();
- } else {
- if (bundles.size() > 0) {
- if (bundleIterator == null) {
- bundleIterator = bundles.poll().iterator();
- } else {
- if (!bundleIterator.hasNext()) {
- bundleIterator = bundles.poll().iterator();
- }
- }
-
- bundledMessageSize--;
- return bundleIterator.next();
- }
- }
-
- return null;
+ return deque.poll();
}
@Override
public final int size() {
- return messages.size() + bundledMessageSize;
+ return deque.size();
}
@Override
public final Iterator<M> iterator() {
- Iterator<M> it = new Iterator<M>() {
-
- @Override
- public boolean hasNext() {
- if (size() > 0)
- return true;
- else
- return false;
- }
-
- @Override
- public M next() {
- return poll();
- }
-
- @Override
- public void remove() {
- }
- };
- return it;
+ return deque.iterator();
}
@Override
@@ -167,5 +117,4 @@ public final class MemoryQueue<M extends
public MessageQueue<M> getMessageQueue() {
return this;
}
-
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
Tue Aug 19 06:38:49 2014
@@ -19,23 +19,17 @@ package org.apache.hama.bsp.message.queu
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.TaskAttemptID;
/**
* Simple queue interface.
*/
-public interface MessageQueue<M extends Writable> extends Iterable<M>,
- Configurable {
+public interface MessageQueue<M> extends Iterable<M>, Configurable {
public static final String PERSISTENT_QUEUE =
"hama.queue.behaviour.persistent";
/**
* Used to initialize the queue.
- *
- * @param conf
- * @param id
*/
public void init(Configuration conf, TaskAttemptID id);
@@ -56,33 +50,20 @@ public interface MessageQueue<M extends
/**
* Adds a whole Java Collection to the implementing queue.
- *
- * @param col
*/
public void addAll(Iterable<M> col);
/**
* Adds the other queue to this queue.
- *
- * @param otherqueue
*/
public void addAll(MessageQueue<M> otherqueue);
/**
* Adds a single item to the implementing queue.
- *
- * @param item
*/
public void add(M item);
/**
- * Adds a bundle to the queue.
- *
- * @param bundle
- */
- public void add(BSPMessageBundle<M> bundle);
-
- /**
* Clears all entries in the given queue.
*/
public void clear();
@@ -104,10 +85,7 @@ public interface MessageQueue<M extends
* @return true if the messages in the queue are serialized to byte buffers.
*/
public boolean isMessageSerialized();
-
- /**
- * @return true if the queue is memory resident.
- */
+
public boolean isMemoryBasedQueue();
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
Tue Aug 19 06:38:49 2014
@@ -20,24 +20,22 @@ package org.apache.hama.bsp.message.queu
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.TaskAttemptID;
/**
* A global mutex based synchronized queue.
*/
-public final class SingleLockQueue<T extends Writable> implements
SynchronizedQueue<T> {
+public final class SingleLockQueue<T> implements SynchronizedQueue<T> {
private final MessageQueue<T> queue;
private final Object mutex;
- public SingleLockQueue(MessageQueue<T> queue) {
+ private SingleLockQueue(MessageQueue<T> queue) {
this.queue = queue;
this.mutex = new Object();
}
- public SingleLockQueue(MessageQueue<T> queue, Object mutex) {
+ private SingleLockQueue(MessageQueue<T> queue, Object mutex) {
this.queue = queue;
this.mutex = mutex;
}
@@ -180,6 +178,22 @@ public final class SingleLockQueue<T ext
}
}
+ /*
+ * static constructor methods to be type safe
+ */
+ public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue) {
+ if(queue.isMemoryBasedQueue()) {
+ return (SynchronizedQueue<T>) queue;
+ }
+
+ return new SingleLockQueue<T>(queue);
+ }
+
+ public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue,
+ Object mutex) {
+ return new SingleLockQueue<T>(queue, mutex);
+ }
+
@Override
public void prepareWrite() {
synchronized (mutex) {
@@ -205,9 +219,4 @@ public final class SingleLockQueue<T ext
public boolean isMemoryBasedQueue() {
return true;
}
-
- @Override
- public void add(BSPMessageBundle<T> bundle) {
- queue.add(bundle);
- }
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
Tue Aug 19 06:38:49 2014
@@ -31,7 +31,7 @@ import org.apache.hama.bsp.TaskAttemptID
* sorted receive and send.
*/
public final class SortedMemoryQueue<M extends WritableComparable<M>>
- implements SynchronizedQueue<M>, MessageQueue<M> {
+ implements SynchronizedQueue<M>, BSPMessageInterface<M> {
private final BlockingQueue<M> queue = new PriorityBlockingQueue<M>();
private Configuration conf;
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
Tue Aug 19 06:38:49 2014
@@ -17,13 +17,11 @@
*/
package org.apache.hama.bsp.message.queue;
-import org.apache.hadoop.io.Writable;
-
/**
* Synchronized Queue interface. Can be used to implement better synchronized
* datastructures.
*/
-public interface SynchronizedQueue<T extends Writable> extends MessageQueue<T>
{
+public interface SynchronizedQueue<T> extends MessageQueue<T> {
public abstract MessageQueue<T> getMessageQueue();
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java Tue
Aug 19 06:38:49 2014
@@ -37,6 +37,14 @@ public class TestPersistQueue extends Te
public static final Log LOG = LogFactory.getLog(TestPartitioning.class);
+ public void testDiskQueue() throws Exception {
+ BSPJob bsp = getNewJobConf();
+ bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
+ "org.apache.hama.bsp.message.queue.DiskQueue");
+
+ assertTrue(bsp.waitForCompletion(true));
+ }
+
public void testMemoryQueue() throws Exception {
BSPJob bsp = getNewJobConf();
bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
@@ -53,10 +61,10 @@ public class TestPersistQueue extends Te
assertTrue(bsp.waitForCompletion(true));
}
- public void testDiskQueue() throws Exception {
+ public void testSpillingQueue() throws Exception {
BSPJob bsp = getNewJobConf();
bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
- "org.apache.hama.bsp.message.queue.DiskQueue");
+ "org.apache.hama.bsp.message.queue.SpillingQueue");
assertTrue(bsp.waitForCompletion(true));
}
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java?rev=1618812&r1=1618811&r2=1618812&view=diff
==============================================================================
---
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
(original)
+++
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
Tue Aug 19 06:38:49 2014
@@ -49,6 +49,7 @@ public class TestHamaMessageManager exte
HamaConfiguration conf = new HamaConfiguration();
conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
MessageQueue.class);
+ conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
messagingInternal(conf);
}
@@ -59,7 +60,7 @@ public class TestHamaMessageManager exte
MessageQueue.class);
messagingInternal(conf);
}
-
+
private static void messagingInternal(HamaConfiguration conf)
throws Exception {
conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS,
@@ -101,10 +102,10 @@ public class TestHamaMessageManager exte
}
messageManager.transfer(peer, bundle);
-
+
messageManager.clearOutgoingMessages();
- assertEquals(messageManager.getNumCurrentMessages(), 1);
+ assertTrue(messageManager.getNumCurrentMessages() == 1);
IntWritable currentMessage = messageManager.getCurrentMessage();
assertEquals(currentMessage.get(), 1337);