Author: edwardyoon
Date: Wed Mar 12 00:59:00 2014
New Revision: 1576554
URL: http://svn.apache.org/r1576554
Log:
HAMA-568: Add faster synchronized collections for message queues
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1576554&r1=1576553&r2=1576554&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Mar 12 00:59:00 2014
@@ -11,6 +11,8 @@ Release 0.7.0 (unreleased changes)
IMPROVEMENTS
+ HAMA-568: Add faster synchronized collections for message queues
(edwardyoon)
+
Release 0.6.4 - Mar 5, 2014
NEW FEATURES
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1576554&r1=1576553&r2=1576554&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
Wed Mar 12 00:59:00 2014
@@ -17,9 +17,8 @@
*/
package org.apache.hama.bsp.message.queue;
-import java.util.ArrayDeque;
-import java.util.Deque;
import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
@@ -28,9 +27,10 @@ import org.apache.hama.bsp.TaskAttemptID
/**
* LinkedList backed queue structure for bookkeeping messages.
*/
-public final class MemoryQueue<M extends Writable> extends POJOMessageQueue<M>
{
+public final class MemoryQueue<M extends Writable> implements
+ SynchronizedQueue<M> {
- private final Deque<M> deque = new ArrayDeque<M>();
+ private final ConcurrentLinkedQueue<M> deque = new
ConcurrentLinkedQueue<M>();
private Configuration conf;
@Override
@@ -112,4 +112,9 @@ public final class MemoryQueue<M extends
public boolean isMemoryBasedQueue() {
return true;
}
+
+ @Override
+ public MessageQueue<M> getMessageQueue() {
+ return this;
+ }
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java?rev=1576554&r1=1576553&r2=1576554&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
Wed Mar 12 00:59:00 2014
@@ -181,8 +181,11 @@ public final class SingleLockQueue<T> im
/*
* static constructor methods to be type safe
*/
-
public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue) {
+ if(queue.isMemoryBasedQueue()) {
+ return (SynchronizedQueue<T>) queue;
+ }
+
return new SingleLockQueue<T>(queue);
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java?rev=1576554&r1=1576553&r2=1576554&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
Wed Mar 12 00:59:00 2014
@@ -18,7 +18,8 @@
package org.apache.hama.bsp.message.queue;
import java.util.Iterator;
-import java.util.PriorityQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparable;
@@ -31,9 +32,9 @@ import org.apache.hama.bsp.message.bundl
* sorted receive and send.
*/
public final class SortedMemoryQueue<M extends WritableComparable<M>>
- implements MessageQueue<M>, BSPMessageInterface<M> {
+ implements SynchronizedQueue<M>, BSPMessageInterface<M> {
- private final PriorityQueue<M> queue = new PriorityQueue<M>();
+ private final BlockingQueue<M> queue = new PriorityBlockingQueue<M>();
private Configuration conf;
@Override
@@ -94,7 +95,7 @@ public final class SortedMemoryQueue<M e
@Override
public void close() {
- this.clear();;
+ this.clear();
}
@Override
@@ -122,4 +123,9 @@ public final class SortedMemoryQueue<M e
return true;
}
+ @Override
+ public MessageQueue<M> getMessageQueue() {
+ return this;
+ }
+
}