Author: edwardyoon
Date: Mon Jan 13 02:24:48 2014
New Revision: 1557644

URL: http://svn.apache.org/r1557644
Log:
HAMA-842: Add persistent queue option to JobConf (edwardyoon)

Added:
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java   
(with props)
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
    
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java
    
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
    
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
    
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
    
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
    
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
    
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
    
hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java

Modified: hama/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Jan 13 02:24:48 2014
@@ -3,7 +3,8 @@ Hama Change Log
 Release 0.7.0 (unreleased changes)
 
   NEW FEATURES
-
+  
+   HAMA-842: Add persistent queue option to JobConf (edwardyoon)
    HAMA-839: Support NullWritable in Hama Pipes (Martin Illecker)    
    HAMA-837: Add sort behaviour to runtime partitioner (edwardyoon)
    HAMA-827: Add NamedVector (edwardyoon)

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Mon Jan 13 
02:24:48 2014
@@ -31,6 +31,7 @@ import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
 import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
+import org.apache.hama.bsp.message.queue.MessageQueue;
 
 /**
  * A BSP job configuration.
@@ -411,4 +412,9 @@ public class BSPJob extends BSPJobContex
   public void setCompressionThreshold(long ct) {
     conf.setLong("hama.messenger.compression.threshold", ct);
   }
+
+  public void setMessageQueueBehaviour(String queueBehaviour) {
+    if (queueBehaviour.equals(MessageQueue.PERSISTENT_QUEUE))
+      conf.setBoolean(MessageQueue.PERSISTENT_QUEUE, true);
+  }
 }

Modified: 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
--- 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
 (original)
+++ 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
 Mon Jan 13 02:24:48 2014
@@ -65,7 +65,7 @@ public abstract class AbstractMessageMan
   protected SynchronizedQueue<M> localQueueForNextIteration;
   // this peer object is just used for counter incrementation
   protected BSPPeer<?, ?, ?, ?, M> peer;
-  
+
   // the task attempt id
   protected TaskAttemptID attemptId;
 
@@ -155,10 +155,38 @@ public abstract class AbstractMessageMan
    */
   @Override
   public final void clearOutgoingQueues() {
-    if (localQueue != null) {
-      localQueue.close();
+    if (conf.getBoolean(MessageQueue.PERSISTENT_QUEUE, false)
+        && localQueue.size() > 0) {
+
+      if (localQueue.isMemoryBasedQueue()
+          && localQueueForNextIteration.isMemoryBasedQueue()) {
+
+        // To reduce the number of element additions
+        if (localQueue.size() > localQueueForNextIteration.size()) {
+          localQueue.addAll(localQueueForNextIteration);
+        } else {
+          localQueueForNextIteration.addAll(localQueue);
+          localQueue = localQueueForNextIteration.getMessageQueue();
+        }
+
+      } else {
+
+        // TODO find the way to switch disk-based queue efficiently.
+        localQueueForNextIteration.addAll(localQueue);
+        if (localQueue != null) {
+          localQueue.close();
+        }
+        localQueue = localQueueForNextIteration.getMessageQueue();
+
+      }
+    } else {
+      if (localQueue != null) {
+        localQueue.close();
+      }
+
+      localQueue = localQueueForNextIteration.getMessageQueue();
     }
-    localQueue = localQueueForNextIteration.getMessageQueue();
+
     localQueue.prepareRead();
     localQueueForNextIteration = getSynchronizedReceiverQueue();
     notifyInit();

Modified: 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
--- 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java
 (original)
+++ 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java
 Mon Jan 13 02:24:48 2014
@@ -51,6 +51,7 @@ public class MessageTransferQueueFactory
 
   }
 
+  @SuppressWarnings("rawtypes")
   public static MessageTransferQueue getMessageTransferQueue(Configuration 
conf) {
     return (MessageTransferQueue) ReflectionUtils.newInstance(conf.getClass(
         MessageManager.TRANSFER_QUEUE_TYPE_CLASS,

Modified: 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
--- 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java 
(original)
+++ 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java 
Mon Jan 13 02:24:48 2014
@@ -308,4 +308,9 @@ public final class DiskQueue<M extends W
   public boolean isMessageSerialized() {
     return false;
   }
+
+  @Override
+  public boolean isMemoryBasedQueue() {
+    return false;
+  }
 }

Modified: 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
--- 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
 (original)
+++ 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
 Mon Jan 13 02:24:48 2014
@@ -107,4 +107,9 @@ public final class MemoryQueue<M extends
   public boolean isMessageSerialized() {
     return false;
   }
+
+  @Override
+  public boolean isMemoryBasedQueue() {
+    return true;
+  }
 }

Modified: 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
--- 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
 (original)
+++ 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
 Mon Jan 13 02:24:48 2014
@@ -26,6 +26,8 @@ import org.apache.hama.bsp.TaskAttemptID
  */
 public interface MessageQueue<M> extends Iterable<M>, Configurable {
 
+  public static final String PERSISTENT_QUEUE = 
"hama.queue.behaviour.persistent";
+
   /**
    * Used to initialize the queue.
    */
@@ -83,5 +85,7 @@ public interface MessageQueue<M> extends
    * @return true if the messages in the queue are serialized to byte buffers.
    */
   public boolean isMessageSerialized();
+  
+  public boolean isMemoryBasedQueue();
 
 }

Modified: 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
--- 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
 (original)
+++ 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
 Mon Jan 13 02:24:48 2014
@@ -211,4 +211,9 @@ public final class SingleLockQueue<T> im
       return queue.isMessageSerialized();
     }
   }
+
+  @Override
+  public boolean isMemoryBasedQueue() {
+    return true;
+  }
 }

Modified: 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
--- 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
 (original)
+++ 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
 Mon Jan 13 02:24:48 2014
@@ -117,4 +117,9 @@ public final class SortedMessageQueue<M 
     addAll((POJOMessageBundle<M>) bundle);
   }
 
+  @Override
+  public boolean isMemoryBasedQueue() {
+    return true;
+  }
+
 }

Modified: 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
--- 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
 (original)
+++ 
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
 Mon Jan 13 02:24:48 2014
@@ -177,8 +177,10 @@ public class SpillingQueue<M extends Wri
 
     try {
       this.spillOutputBuffer.close();
-      this.spilledInput.close();
-      this.spilledInput.completeReading(true);
+      if (this.spilledInput != null) {
+        this.spilledInput.close();
+        this.spilledInput.completeReading(true);
+      }
     } catch (IOException e) {
       LOG.error("Error closing the spilled input stream.", e);
       throw new RuntimeException(e);
@@ -362,4 +364,9 @@ public class SpillingQueue<M extends Wri
     }
   }
 
+  @Override
+  public boolean isMemoryBasedQueue() {
+    return false;
+  }
+
 }

Added: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java?rev=1557644&view=auto
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java 
(added)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java Mon 
Jan 13 02:24:48 2014
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.message.queue.MessageQueue;
+import org.apache.hama.bsp.sync.SyncException;
+
+public class TestPersistQueue extends TestCase {
+
+  public static final Log LOG = LogFactory.getLog(TestPartitioning.class);
+
+  public void testDiskQueue() throws Exception {
+    BSPJob bsp = getNewJobConf();
+    bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
+        "org.apache.hama.bsp.message.queue.DiskTransferProtocolQueue");
+
+    assertTrue(bsp.waitForCompletion(true));
+  }
+
+  public void testMemoryQueue() throws Exception {
+    BSPJob bsp = getNewJobConf();
+    bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
+        "org.apache.hama.bsp.message.queue.MemoryTransferProtocol");
+
+    assertTrue(bsp.waitForCompletion(true));
+  }
+
+  public void testSortedQueue() throws Exception {
+    BSPJob bsp = getNewJobConf();
+    bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
+        "org.apache.hama.bsp.message.queue.SortedMessageTransferProtocol");
+
+    assertTrue(bsp.waitForCompletion(true));
+  }
+
+  public void testSpillingQueue() throws Exception {
+    BSPJob bsp = getNewJobConf();
+    bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
+        "org.apache.hama.bsp.message.queue.SpillingQueueTransferProtocol");
+
+    assertTrue(bsp.waitForCompletion(true));
+  }
+
+  public BSPJob getNewJobConf() throws Exception {
+    Configuration conf = new Configuration();
+    BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
+    bsp.setJobName("Test persistent behaviour");
+    bsp.setBspClass(persistentMsgBSP.class);
+    bsp.setNumBspTask(2);
+    bsp.setInputFormat(NullInputFormat.class);
+    bsp.setOutputFormat(NullOutputFormat.class);
+    bsp.setMessageQueueBehaviour(MessageQueue.PERSISTENT_QUEUE);
+    return bsp;
+  }
+
+  public static class persistentMsgBSP extends
+      BSP<LongWritable, Text, NullWritable, NullWritable, IntWritable> {
+
+    @Override
+    public void bsp(
+        BSPPeer<LongWritable, Text, NullWritable, NullWritable, IntWritable> 
peer)
+        throws IOException, SyncException, InterruptedException {
+
+      for (int i = 0; i < 10; i++) {
+        peer.send(peer.getPeerName(0), new IntWritable(i));
+        peer.send(peer.getPeerName(1), new IntWritable(i));
+        peer.sync();
+
+        if ((peer.getSuperstepCount() % 2) == 0) {
+          peer.getCurrentMessage();
+        }
+      }
+
+      int cnt = 0;
+      while ((peer.getCurrentMessage()) != null) {
+        cnt++;
+      }
+
+      assertTrue(cnt == 15);
+    }
+  }
+}

Propchange: 
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java?rev=1557644&r1=1557643&r2=1557644&view=diff
==============================================================================
--- 
hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
 (original)
+++ 
hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
 Mon Jan 13 02:24:48 2014
@@ -145,6 +145,7 @@ public class BipartiteMatchingTest exten
   @Test
   public void testBipartiteMatching() throws IOException, InterruptedException,
       ClassNotFoundException {
+    deleteTempDirs();
     generateTestData();
     try {
       String seed = "2";


Reply via email to