Author: edwardyoon
Date: Mon Jan 13 02:24:48 2014
New Revision: 1557644
URL: http://svn.apache.org/r1557644
Log:
HAMA-842: Add persistent queue option to JobConf (edwardyoon)
Added:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
(with props)
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Jan 13 02:24:48 2014
@@ -3,7 +3,8 @@ Hama Change Log
Release 0.7.0 (unreleased changes)
NEW FEATURES
-
+
+ HAMA-842: Add persistent queue option to JobConf (edwardyoon)
HAMA-839: Support NullWritable in Hama Pipes (Martin Illecker)
HAMA-837: Add sort behaviour to runtime partitioner (edwardyoon)
HAMA-827: Add NamedVector (edwardyoon)
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Mon Jan 13
02:24:48 2014
@@ -31,6 +31,7 @@ import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
+import org.apache.hama.bsp.message.queue.MessageQueue;
/**
* A BSP job configuration.
@@ -411,4 +412,9 @@ public class BSPJob extends BSPJobContex
public void setCompressionThreshold(long ct) {
conf.setLong("hama.messenger.compression.threshold", ct);
}
+
+ public void setMessageQueueBehaviour(String queueBehaviour) {
+ if (queueBehaviour.equals(MessageQueue.PERSISTENT_QUEUE))
+ conf.setBoolean(MessageQueue.PERSISTENT_QUEUE, true);
+ }
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Mon Jan 13 02:24:48 2014
@@ -65,7 +65,7 @@ public abstract class AbstractMessageMan
protected SynchronizedQueue<M> localQueueForNextIteration;
// this peer object is just used for counter incrementation
protected BSPPeer<?, ?, ?, ?, M> peer;
-
+
// the task attempt id
protected TaskAttemptID attemptId;
@@ -155,10 +155,38 @@ public abstract class AbstractMessageMan
*/
@Override
public final void clearOutgoingQueues() {
- if (localQueue != null) {
- localQueue.close();
+ if (conf.getBoolean(MessageQueue.PERSISTENT_QUEUE, false)
+ && localQueue.size() > 0) {
+
+ if (localQueue.isMemoryBasedQueue()
+ && localQueueForNextIteration.isMemoryBasedQueue()) {
+
+ // To reduce the number of element additions
+ if (localQueue.size() > localQueueForNextIteration.size()) {
+ localQueue.addAll(localQueueForNextIteration);
+ } else {
+ localQueueForNextIteration.addAll(localQueue);
+ localQueue = localQueueForNextIteration.getMessageQueue();
+ }
+
+ } else {
+
+ // TODO find the way to switch disk-based queue efficiently.
+ localQueueForNextIteration.addAll(localQueue);
+ if (localQueue != null) {
+ localQueue.close();
+ }
+ localQueue = localQueueForNextIteration.getMessageQueue();
+
+ }
+ } else {
+ if (localQueue != null) {
+ localQueue.close();
+ }
+
+ localQueue = localQueueForNextIteration.getMessageQueue();
}
- localQueue = localQueueForNextIteration.getMessageQueue();
+
localQueue.prepareRead();
localQueueForNextIteration = getSynchronizedReceiverQueue();
notifyInit();
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java
Mon Jan 13 02:24:48 2014
@@ -51,6 +51,7 @@ public class MessageTransferQueueFactory
}
+ @SuppressWarnings("rawtypes")
public static MessageTransferQueue getMessageTransferQueue(Configuration
conf) {
return (MessageTransferQueue) ReflectionUtils.newInstance(conf.getClass(
MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
Mon Jan 13 02:24:48 2014
@@ -308,4 +308,9 @@ public final class DiskQueue<M extends W
public boolean isMessageSerialized() {
return false;
}
+
+ @Override
+ public boolean isMemoryBasedQueue() {
+ return false;
+ }
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
Mon Jan 13 02:24:48 2014
@@ -107,4 +107,9 @@ public final class MemoryQueue<M extends
public boolean isMessageSerialized() {
return false;
}
+
+ @Override
+ public boolean isMemoryBasedQueue() {
+ return true;
+ }
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
Mon Jan 13 02:24:48 2014
@@ -26,6 +26,8 @@ import org.apache.hama.bsp.TaskAttemptID
*/
public interface MessageQueue<M> extends Iterable<M>, Configurable {
+ public static final String PERSISTENT_QUEUE =
"hama.queue.behaviour.persistent";
+
/**
* Used to initialize the queue.
*/
@@ -83,5 +85,7 @@ public interface MessageQueue<M> extends
* @return true if the messages in the queue are serialized to byte buffers.
*/
public boolean isMessageSerialized();
+
+ public boolean isMemoryBasedQueue();
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
Mon Jan 13 02:24:48 2014
@@ -211,4 +211,9 @@ public final class SingleLockQueue<T> im
return queue.isMessageSerialized();
}
}
+
+ @Override
+ public boolean isMemoryBasedQueue() {
+ return true;
+ }
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
Mon Jan 13 02:24:48 2014
@@ -117,4 +117,9 @@ public final class SortedMessageQueue<M
addAll((POJOMessageBundle<M>) bundle);
}
+ @Override
+ public boolean isMemoryBasedQueue() {
+ return true;
+ }
+
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
Mon Jan 13 02:24:48 2014
@@ -177,8 +177,10 @@ public class SpillingQueue<M extends Wri
try {
this.spillOutputBuffer.close();
- this.spilledInput.close();
- this.spilledInput.completeReading(true);
+ if (this.spilledInput != null) {
+ this.spilledInput.close();
+ this.spilledInput.completeReading(true);
+ }
} catch (IOException e) {
LOG.error("Error closing the spilled input stream.", e);
throw new RuntimeException(e);
@@ -362,4 +364,9 @@ public class SpillingQueue<M extends Wri
}
}
+ @Override
+ public boolean isMemoryBasedQueue() {
+ return false;
+ }
+
}
Added: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java?rev=1557644&view=auto
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
(added)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java Mon
Jan 13 02:24:48 2014
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.message.queue.MessageQueue;
+import org.apache.hama.bsp.sync.SyncException;
+
+public class TestPersistQueue extends TestCase {
+
+ public static final Log LOG = LogFactory.getLog(TestPartitioning.class);
+
+ public void testDiskQueue() throws Exception {
+ BSPJob bsp = getNewJobConf();
+ bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
+ "org.apache.hama.bsp.message.queue.DiskTransferProtocolQueue");
+
+ assertTrue(bsp.waitForCompletion(true));
+ }
+
+ public void testMemoryQueue() throws Exception {
+ BSPJob bsp = getNewJobConf();
+ bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
+ "org.apache.hama.bsp.message.queue.MemoryTransferProtocol");
+
+ assertTrue(bsp.waitForCompletion(true));
+ }
+
+ public void testSortedQueue() throws Exception {
+ BSPJob bsp = getNewJobConf();
+ bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
+ "org.apache.hama.bsp.message.queue.SortedMessageTransferProtocol");
+
+ assertTrue(bsp.waitForCompletion(true));
+ }
+
+ public void testSpillingQueue() throws Exception {
+ BSPJob bsp = getNewJobConf();
+ bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
+ "org.apache.hama.bsp.message.queue.SpillingQueueTransferProtocol");
+
+ assertTrue(bsp.waitForCompletion(true));
+ }
+
+ public BSPJob getNewJobConf() throws Exception {
+ Configuration conf = new Configuration();
+ BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
+ bsp.setJobName("Test persistent behaviour");
+ bsp.setBspClass(persistentMsgBSP.class);
+ bsp.setNumBspTask(2);
+ bsp.setInputFormat(NullInputFormat.class);
+ bsp.setOutputFormat(NullOutputFormat.class);
+ bsp.setMessageQueueBehaviour(MessageQueue.PERSISTENT_QUEUE);
+ return bsp;
+ }
+
+ public static class persistentMsgBSP extends
+ BSP<LongWritable, Text, NullWritable, NullWritable, IntWritable> {
+
+ @Override
+ public void bsp(
+ BSPPeer<LongWritable, Text, NullWritable, NullWritable, IntWritable>
peer)
+ throws IOException, SyncException, InterruptedException {
+
+ for (int i = 0; i < 10; i++) {
+ peer.send(peer.getPeerName(0), new IntWritable(i));
+ peer.send(peer.getPeerName(1), new IntWritable(i));
+ peer.sync();
+
+ if ((peer.getSuperstepCount() % 2) == 0) {
+ peer.getCurrentMessage();
+ }
+ }
+
+ int cnt = 0;
+ while ((peer.getCurrentMessage()) != null) {
+ cnt++;
+ }
+
+ assertTrue(cnt == 15);
+ }
+ }
+}
Propchange:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
---
hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
(original)
+++
hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
Mon Jan 13 02:24:48 2014
@@ -145,6 +145,7 @@ public class BipartiteMatchingTest exten
@Test
public void testBipartiteMatching() throws IOException, InterruptedException,
ClassNotFoundException {
+ deleteTempDirs();
generateTestData();
try {
String seed = "2";