Author: tjungblut Date: Sat May 19 08:32:28 2012 New Revision: 1340371 URL: http://svn.apache.org/viewvc?rev=1340371&view=rev Log: [HAMA-566]: Add a disk-based queue
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SingleLockQueue.java Modified: incubator/hama/trunk/CHANGES.txt incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java Modified: incubator/hama/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1340371&r1=1340370&r2=1340371&view=diff ============================================================================== --- incubator/hama/trunk/CHANGES.txt (original) +++ incubator/hama/trunk/CHANGES.txt Sat May 19 08:32:28 2012 @@ -4,6 +4,7 @@ Release 0.5 - April 10, 2012 NEW FEATURES + HAMA-566: Add disk-based queue (tjungblut) HAMA-552: Add a sorted message queue (tjungblut) HAMA-556: Graph package to support stopping the interations when the node changes are within the tolerance value as in the case of page rank (tjungblut) HAMA-508: Add clean plugin (Mikalai Parafeniuk via edwardyoon) Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1340371&r1=1340370&r2=1340371&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Sat May 19 08:32:28 2012 @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hama.bsp.BSPPeer; @@ -35,10 +36,15 @@ import org.apache.hama.bsp.BSPPeerImpl; import org.apache.hama.bsp.TaskAttemptID; import org.apache.hama.util.BSPNetUtils; +/** + * Abstract baseclass that should contain all information and services needed + * for the concrete RPC subclasses. For example it manages how the queues are + * managed and it maintains a cache for socket addresses. + */ public abstract class AbstractMessageManager<M extends Writable> implements MessageManager<M>, Configurable { - private static final Log LOG = LogFactory + protected static final Log LOG = LogFactory .getLog(AbstractMessageManager.class); // conf is injected via reflection of the factory @@ -48,12 +54,19 @@ public abstract class AbstractMessageMan protected MessageQueue<M> localQueue; // this must be a synchronized implementation: this is accessed per RPC protected SynchronizedQueue<M> localQueueForNextIteration; + // this peer object is just used for counter incrementation protected BSPPeer<?, ?, ?, ?, M> peer; // the peer address of this peer protected InetSocketAddress peerAddress; // the task attempt id protected TaskAttemptID attemptId; + /* + * (non-Javadoc) + * @see org.apache.hama.bsp.message.MessageManager#init(org.apache.hama.bsp. + * TaskAttemptID, org.apache.hama.bsp.BSPPeer, + * org.apache.hadoop.conf.Configuration, java.net.InetSocketAddress) + */ @Override public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer, Configuration conf, InetSocketAddress peerAddress) { @@ -62,11 +75,13 @@ public abstract class AbstractMessageMan this.conf = conf; this.peerAddress = peerAddress; localQueue = getQueue(); - localQueue.init(conf, attemptId); localQueueForNextIteration = getSynchronizedQueue(); - localQueueForNextIteration.init(conf, attemptId); } + /* + * (non-Javadoc) + * @see org.apache.hama.bsp.message.MessageManager#close() + */ @Override public void close() { Collection<MessageQueue<M>> values = outgoingQueues.values(); @@ -74,8 +89,21 @@ public abstract class AbstractMessageMan msgQueue.close(); } localQueue.close(); + // remove possible disk queues from the path + try { + FileSystem.get(conf).delete( + DiskQueue.getQueueDir(conf, attemptId, + conf.get(DiskQueue.DISK_QUEUE_PATH_KEY)), true); + } catch (IOException e) { + LOG.warn("Queue dir couldn't be deleted"); + } + } + /* + * (non-Javadoc) + * @see org.apache.hama.bsp.message.MessageManager#finishSendPhase() + */ @Override public void finishSendPhase() throws IOException { Collection<MessageQueue<M>> values = outgoingQueues.values(); @@ -84,29 +112,42 @@ public abstract class AbstractMessageMan } } + /* + * (non-Javadoc) + * @see org.apache.hama.bsp.message.MessageManager#getCurrentMessage() + */ @Override public final M getCurrentMessage() throws IOException { return localQueue.poll(); } + /* + * (non-Javadoc) + * @see org.apache.hama.bsp.message.MessageManager#getNumCurrentMessages() + */ @Override public final int getNumCurrentMessages() { return localQueue.size(); } + /* + * (non-Javadoc) + * @see org.apache.hama.bsp.message.MessageManager#clearOutgoingQueues() + */ @Override public final void clearOutgoingQueues() { - this.outgoingQueues.clear(); - localQueueForNextIteration.prepareRead(); - localQueue.prepareWrite(); - localQueue.addAll(localQueueForNextIteration.getMessageQueue()); + localQueue = localQueueForNextIteration.getMessageQueue(); localQueue.prepareRead(); - localQueueForNextIteration.clear(); + localQueueForNextIteration = getSynchronizedQueue(); } + /* + * (non-Javadoc) + * @see org.apache.hama.bsp.message.MessageManager#send(java.lang.String, + * org.apache.hadoop.io.Writable) + */ @Override public void send(String peerName, M msg) throws IOException { - LOG.debug("Send message (" + msg.toString() + ") to " + peerName); InetSocketAddress targetPeerAddress = null; // Get socket for target peer. if (peerSocketCache.containsKey(peerName)) { @@ -124,6 +165,10 @@ public abstract class AbstractMessageMan outgoingQueues.put(targetPeerAddress, queue); } + /* + * (non-Javadoc) + * @see org.apache.hama.bsp.message.MessageManager#getMessageIterator() + */ @Override public final Iterator<Entry<InetSocketAddress, MessageQueue<M>>> getMessageIterator() { return this.outgoingQueues.entrySet().iterator(); @@ -132,12 +177,14 @@ public abstract class AbstractMessageMan /** * Returns a new queue implementation based on what was configured. If nothing * has been configured for "hama.messenger.queue.class" then the - * {@link MemoryQueue} is used. + * {@link MemoryQueue} is used. If you have scalability issues, then better + * use {@link DiskQueue}. * * @return a <b>new</b> queue implementation. */ protected MessageQueue<M> getQueue() { Class<?> queueClass = conf.getClass(QUEUE_TYPE_CLASS, MemoryQueue.class); + LOG.debug("Creating new " + queueClass); @SuppressWarnings("unchecked") MessageQueue<M> newInstance = (MessageQueue<M>) ReflectionUtils .newInstance(queueClass, conf); @@ -146,13 +193,15 @@ public abstract class AbstractMessageMan } protected SynchronizedQueue<M> getSynchronizedQueue() { - return SynchronizedQueue.synchronize(getQueue()); + return SingleLockQueue.synchronize(getQueue()); } + @Override public final Configuration getConf() { return conf; } + @Override public final void setConf(Configuration conf) { this.conf = conf; } Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java?rev=1340371&r1=1340370&r2=1340371&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java Sat May 19 08:32:28 2012 @@ -31,11 +31,13 @@ public final class AvroBSPMessageBundle< @Deprecated public java.nio.ByteBuffer data; + @Override public final org.apache.avro.Schema getSchema() { return SCHEMA$; } // Used by DatumWriter. Applications should not call. + @Override public final java.lang.Object get(int field$) { switch (field$) { case 0: @@ -46,6 +48,7 @@ public final class AvroBSPMessageBundle< } // Used by DatumReader. Applications should not call. + @Override public final void put(int field$, java.lang.Object value$) { switch (field$) { case 0: @@ -116,12 +119,12 @@ public final class AvroBSPMessageBundle< private Builder(AvroBSPMessageBundle<?> other) { super(AvroBSPMessageBundle.SCHEMA$); if (isValidValue(fields[0], other.data)) { - data = (java.nio.ByteBuffer) clone(other.data); + data = clone(other.data); fieldSetFlags[0] = true; } } - public final ByteBuffer clone(ByteBuffer original) { + public final static ByteBuffer clone(ByteBuffer original) { ByteBuffer clone = ByteBuffer.allocate(original.capacity()); original.rewind(); clone.put(original); Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java?rev=1340371&r1=1340370&r2=1340371&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java Sat May 19 08:32:28 2012 @@ -81,7 +81,7 @@ public final class AvroMessageManagerImp if (sender == null) { NettyTransceiver client = new NettyTransceiver(addr); - sender = (Sender<M>) SpecificRequestor.getClient(Sender.class, client); + sender = SpecificRequestor.getClient(Sender.class, client); peers.put(addr, sender); } Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java?rev=1340371&r1=1340370&r2=1340371&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java Sat May 19 08:32:28 2012 @@ -24,41 +24,45 @@ import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.ObjectWritable; -import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hama.bsp.TaskAttemptID; /** - * A disk based queue that is backed by a sequencefile. <br/> + * A disk based queue that is backed by a raw file on local disk. <br/> * Structure is as follows: <br/> * If "bsp.disk.queue.dir" is not defined, "hama.tmp.dir" will be used instead. <br/> * ${hama.tmp.dir}/diskqueue/job_id/task_attempt_id/ <br/> * An ongoing sequencenumber will be appended to prevent inner collisions, * however the job_id dir will never be deleted. So you need a cronjob to do the * cleanup for you. <br/> - * <b>This is currently not intended to be production ready</b> + * It is recommended to use the file:// scheme in front of the property, because + * writes on DFS are expensive, however your local disk may not have enough + * space for your message, so you can easily switch per job via your + * configuration. <br/> + * <b>It is experimental to use.</b> */ public final class DiskQueue<M extends Writable> implements MessageQueue<M> { public static final String DISK_QUEUE_PATH_KEY = "bsp.disk.queue.dir"; - private final Log LOG = LogFactory.getLog(DiskQueue.class); + private static final int MAX_RETRIES = 4; + private static final Log LOG = LogFactory.getLog(DiskQueue.class); private static volatile int ONGOING_SEQUENCE_NUMBER = 0; - private static final int MAX_RETRIES = 4; - private static final NullWritable NULL_WRITABLE = NullWritable.get(); private int size = 0; // injected via reflection private Configuration conf; private FileSystem fs; - private SequenceFile.Writer writer; - private SequenceFile.Reader reader; + private FSDataOutputStream writer; + private FSDataInputStream reader; + private Path queuePath; private TaskAttemptID id; private final ObjectWritable writable = new ObjectWritable(); @@ -71,17 +75,7 @@ public final class DiskQueue<M extends W fs = FileSystem.get(conf); String configuredQueueDir = conf.get(DISK_QUEUE_PATH_KEY); Path queueDir = null; - if (configuredQueueDir == null) { - String hamaTmpDir = conf.get("hama.tmp.dir"); - if (hamaTmpDir != null) { - queueDir = createDiskQueuePath(id, hamaTmpDir); - } else { - // use some local tmp dir - queueDir = createDiskQueuePath(id, "/tmp/messageStorage/"); - } - } else { - queueDir = createDiskQueuePath(id, configuredQueueDir); - } + queueDir = getQueueDir(conf, id, configuredQueueDir); fs.mkdirs(queueDir); queuePath = new Path(queueDir, (ONGOING_SEQUENCE_NUMBER++) + "_messages.seq"); @@ -109,6 +103,7 @@ public final class DiskQueue<M extends W private void closeInternal(boolean delete) { try { if (writer != null) { + writer.flush(); writer.close(); writer = null; } @@ -124,6 +119,7 @@ public final class DiskQueue<M extends W } if (writer != null) { try { + writer.flush(); writer.close(); writer = null; } catch (IOException e) { @@ -155,7 +151,7 @@ public final class DiskQueue<M extends W // make sure we've closed closeInternal(false); try { - reader = new SequenceFile.Reader(fs, queuePath, conf); + reader = fs.open(queuePath); } catch (IOException e) { // can't recover from that LOG.error(e); @@ -166,8 +162,7 @@ public final class DiskQueue<M extends W @Override public void prepareWrite() { try { - writer = new SequenceFile.Writer(fs, conf, queuePath, - ObjectWritable.class, NullWritable.class); + writer = fs.create(queuePath); } catch (IOException e) { // can't recover from that LOG.error(e); @@ -194,7 +189,7 @@ public final class DiskQueue<M extends W public final void add(M item) { size++; try { - writer.append(new ObjectWritable(item), NULL_WRITABLE); + new ObjectWritable(item).write(writer); } catch (IOException e) { LOG.error(e); } @@ -210,16 +205,19 @@ public final class DiskQueue<M extends W @SuppressWarnings("unchecked") @Override public final M poll() { + if (size == 0) { + return null; + } size--; int tries = 1; while (tries <= MAX_RETRIES) { try { - boolean next = reader.next(writable, NULL_WRITABLE); - if (next) { + writable.readFields(reader); + if (size > 0) { return (M) writable.get(); } else { - closeInternal(false); - return null; + closeInternal(true); + return (M) writable.get(); } } catch (IOException e) { LOG.error("Retrying for the " + tries + "th time!", e); @@ -277,11 +275,32 @@ public final class DiskQueue<M extends W } /** + * Creates a path for a queue + */ + public static Path getQueueDir(Configuration conf, TaskAttemptID id, + String configuredQueueDir) { + Path queueDir; + if (configuredQueueDir == null) { + String hamaTmpDir = conf.get("hama.tmp.dir"); + if (hamaTmpDir != null) { + queueDir = createDiskQueuePath(id, hamaTmpDir); + } else { + // use some local tmp dir + queueDir = createDiskQueuePath(id, "/tmp/messageStorage/"); + } + } else { + queueDir = createDiskQueuePath(id, configuredQueueDir); + } + return queueDir; + } + + /** * Creates a generic Path based on the configured path and the task attempt id * to store disk sequence files. <br/> * Structure is as follows: ${hama.tmp.dir}/diskqueue/job_id/task_attempt_id/ */ - public static Path createDiskQueuePath(TaskAttemptID id, String configuredPath) { + private static Path createDiskQueuePath(TaskAttemptID id, + String configuredPath) { return new Path(new Path(new Path(configuredPath, "diskqueue"), id .getJobID().toString()), id.getTaskID().toString()); } Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1340371&r1=1340370&r2=1340371&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java Sat May 19 08:32:28 2012 @@ -32,6 +32,7 @@ import org.apache.hama.bsp.BSPPeer; import org.apache.hama.bsp.BSPPeerImpl; import org.apache.hama.bsp.TaskAttemptID; import org.apache.hama.bsp.message.compress.BSPCompressedBundle; +import org.apache.hama.ipc.HamaRPCProtocolVersion; import org.apache.hama.util.CompressionUtil; /** @@ -107,7 +108,7 @@ public final class HadoopMessageManagerI HadoopMessageManager<M> peer = peers.get(addr); if (peer == null) { peer = (HadoopMessageManager<M>) RPC.getProxy(HadoopMessageManager.class, - HadoopMessageManager.versionID, addr, this.conf); + HamaRPCProtocolVersion.versionID, addr, this.conf); this.peers.put(addr, peer); } return peer; Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SingleLockQueue.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SingleLockQueue.java?rev=1340371&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SingleLockQueue.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SingleLockQueue.java Sat May 19 08:32:28 2012 @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message; + +import java.util.Collection; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hama.bsp.TaskAttemptID; + +/** + * A global mutex based synchronized queue. + */ +public final class SingleLockQueue<T> implements SynchronizedQueue<T> { + + private final MessageQueue<T> queue; + private final Object mutex; + + private SingleLockQueue(MessageQueue<T> queue) { + this.queue = queue; + this.mutex = new Object(); + } + + private SingleLockQueue(MessageQueue<T> queue, Object mutex) { + this.queue = queue; + this.mutex = mutex; + } + + /* + * (non-Javadoc) + * @see org.apache.hama.bsp.message.SynchronizedQueue#iterator() + */ + @Override + public Iterator<T> iterator() { + synchronized (mutex) { + return queue.iterator(); + } + } + + /* + * (non-Javadoc) + * @see + * org.apache.hama.bsp.message.SynchronizedQueue#setConf(org.apache.hadoop + * .conf.Configuration) + */ + @Override + public void setConf(Configuration conf) { + synchronized (mutex) { + queue.setConf(conf); + } + } + + /* + * (non-Javadoc) + * @see org.apache.hama.bsp.message.SynchronizedQueue#getConf() + */ + @Override + public Configuration getConf() { + synchronized (mutex) { + return queue.getConf(); + } + } + + /* + * (non-Javadoc) + * @see + * org.apache.hama.bsp.message.SynchronizedQueue#init(org.apache.hadoop.conf + * .Configuration, org.apache.hama.bsp.TaskAttemptID) + */ + @Override + public void init(Configuration conf, TaskAttemptID id) { + synchronized (mutex) { + queue.init(conf, id); + } + } + + /* + * (non-Javadoc) + * @see org.apache.hama.bsp.message.SynchronizedQueue#close() + */ + @Override + public void close() { + synchronized (mutex) { + } + queue.close(); + } + + /* + * (non-Javadoc) + * @see org.apache.hama.bsp.message.SynchronizedQueue#prepareRead() + */ + @Override + public void prepareRead() { + synchronized (mutex) { + queue.prepareRead(); + } + } + + /* + * (non-Javadoc) + * @see + * org.apache.hama.bsp.message.SynchronizedQueue#addAll(java.util.Collection) + */ + @Override + public void addAll(Collection<T> col) { + synchronized (mutex) { + queue.addAll(col); + } + } + + /* + * (non-Javadoc) + * @see org.apache.hama.bsp.message.SynchronizedQueue#add(T) + */ + @Override + public void add(T item) { + synchronized (mutex) { + queue.add(item); + } + } + + /* + * (non-Javadoc) + * @see org.apache.hama.bsp.message.SynchronizedQueue#clear() + */ + @Override + public void clear() { + synchronized (mutex) { + queue.clear(); + } + } + + /* + * (non-Javadoc) + * @see org.apache.hama.bsp.message.SynchronizedQueue#poll() + */ + @Override + public Object poll() { + synchronized (mutex) { + return queue.poll(); + } + } + + /* + * (non-Javadoc) + * @see org.apache.hama.bsp.message.SynchronizedQueue#size() + */ + @Override + public int size() { + synchronized (mutex) { + return queue.size(); + } + } + + /* + * (non-Javadoc) + * @see org.apache.hama.bsp.message.SynchronizedQueue#getMessageQueue() + */ + @Override + public MessageQueue<T> getMessageQueue() { + synchronized (mutex) { + return queue; + } + } + + /* + * static constructor methods to be type safe + */ + + public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue) { + return new SingleLockQueue<T>(queue); + } + + public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue, + Object mutex) { + return new SingleLockQueue<T>(queue, mutex); + } +} Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java?rev=1340371&r1=1340370&r2=1340371&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java Sat May 19 08:32:28 2012 @@ -20,110 +20,34 @@ package org.apache.hama.bsp.message; import java.util.Collection; import java.util.Iterator; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hama.bsp.TaskAttemptID; /** - * A global mutex based synchronized queue. + * Synchronized Queue interface. Can be used to implement better synchronized + * datastructures. */ -public final class SynchronizedQueue<T> { +public interface SynchronizedQueue<T> extends Configurable { - private final MessageQueue<T> queue; - private final Object mutex; + public abstract Iterator<T> iterator(); - private SynchronizedQueue(MessageQueue<T> queue) { - this.queue = queue; - this.mutex = new Object(); - } - - private SynchronizedQueue(MessageQueue<T> queue, Object mutex) { - this.queue = queue; - this.mutex = mutex; - } - - public Iterator<T> iterator() { - synchronized (mutex) { - return queue.iterator(); - } - } - - public void setConf(Configuration conf) { - synchronized (mutex) { - queue.setConf(conf); - } - } - - public Configuration getConf() { - synchronized (mutex) { - return queue.getConf(); - } - } - - public void init(Configuration conf, TaskAttemptID id) { - synchronized (mutex) { - queue.init(conf, id); - } - } - - public void close() { - synchronized (mutex) { - } - queue.close(); - } - - public void prepareRead() { - synchronized (mutex) { - queue.prepareRead(); - } - } - - public void addAll(Collection<T> col) { - synchronized (mutex) { - queue.addAll(col); - } - } - - public void add(T item) { - synchronized (mutex) { - queue.add(item); - } - } - - public void clear() { - synchronized (mutex) { - queue.clear(); - } - } - - public Object poll() { - synchronized (mutex) { - return queue.poll(); - } - } - - public int size() { - synchronized (mutex) { - return queue.size(); - } - } - - public MessageQueue<T> getMessageQueue() { - synchronized (mutex) { - return queue; - } - } - - /* - * static constructor methods to be type safe - */ - - public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue) { - return new SynchronizedQueue<T>(queue); - } - - public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue, - Object mutex) { - return new SynchronizedQueue<T>(queue, mutex); - } + public abstract void init(Configuration conf, TaskAttemptID id); + + public abstract void close(); + + public abstract void prepareRead(); + + public abstract void addAll(Collection<T> col); + + public abstract void add(T item); + + public abstract void clear(); + + public abstract Object poll(); + + public abstract int size(); + + public abstract MessageQueue<T> getMessageQueue(); } Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java?rev=1340371&r1=1340370&r2=1340371&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java Sat May 19 08:32:28 2012 @@ -34,6 +34,7 @@ public class Bzip2Compressor<M extends W private final BZip2Codec codec = new BZip2Codec(); + @Override public BSPCompressedBundle compressBundle(BSPMessageBundle<M> bundle) { BSPCompressedBundle compMsgBundle = null; ByteArrayOutputStream bos = null; @@ -71,6 +72,7 @@ public class Bzip2Compressor<M extends W * @param compMsgBundle * @return */ + @Override public BSPMessageBundle<M> decompressBundle(BSPCompressedBundle compMsgBundle) { ByteArrayInputStream bis = null; CompressionInputStream sis = null; Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java?rev=1340371&r1=1340370&r2=1340371&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java Sat May 19 08:32:28 2012 @@ -31,6 +31,7 @@ import org.xerial.snappy.SnappyOutputStr public class SnappyCompressor<M extends Writable> implements BSPMessageCompressor<M> { + @Override public BSPCompressedBundle compressBundle(BSPMessageBundle<M> bundle) { BSPCompressedBundle compMsgBundle = null; ByteArrayOutputStream bos = null; @@ -68,6 +69,7 @@ public class SnappyCompressor<M extends * @param compMsgBundle * @return */ + @Override public BSPMessageBundle<M> decompressBundle(BSPCompressedBundle compMsgBundle) { ByteArrayInputStream bis = null; SnappyInputStream sis = null; Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java?rev=1340371&r1=1340370&r2=1340371&view=diff ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java (original) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java Sat May 19 08:32:28 2012 @@ -117,7 +117,7 @@ public class MiniZooKeeperCluster { return clientPort; } - private void recreateDir(File dir) throws IOException { + private static void recreateDir(File dir) throws IOException { if (dir.exists()) { FileUtil.fullyDelete(dir); } Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1340371&r1=1340370&r2=1340371&view=diff ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Sat May 19 08:32:28 2012 @@ -132,62 +132,31 @@ public class TestBSPMasterGroomServer ex */ /* - * BEGIN: ZooKeeper tests. - - public void testClearZKNodes() throws IOException, KeeperException, - InterruptedException { - - // Clear any existing znode with the same path as bspRoot. - bspCluster.getBSPMaster().clearZKNodes(); - - int timeout = configuration.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, - 6000); - String connectStr = QuorumPeer.getZKQuorumServersString(configuration); - String bspRoot = configuration.get(Constants.ZOOKEEPER_ROOT, - Constants.DEFAULT_ZOOKEEPER_ROOT); - - // Establishing a zk session. - ZooKeeper zk = new ZooKeeper(connectStr, timeout, new Watcher() { - @Override - public void process(WatchedEvent event) { - // Do nothing.(Dummy Watcher) - } - }); - - // Creating dummy bspRoot if it doesn't already exist. - Stat s = zk.exists(bspRoot, false); - if (s == null) { - zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } - - // Creating dummy child nodes at depth 1. - String node1 = bspRoot + "/task1"; - String node2 = bspRoot + "/task2"; - zk.create(node1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create(node2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - // Creating dummy child node at depth 2. - String node11 = node1 + "/superstep1"; - zk.create(node11, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - ArrayList<String> list = (ArrayList<String>) zk.getChildren(bspRoot, false); - assertEquals(2, list.size()); - System.out.println(list.size()); - - bspCluster.getBSPMaster().clearZKNodes(); - - list = (ArrayList<String>) zk.getChildren(bspRoot, false); - System.out.println(list.size()); - assertEquals(0, list.size()); - - try { - zk.getData(node11, false, null); - fail(); - } catch (KeeperException.NoNodeException e) { - System.out.println("Node has been removed correctly!"); - } - } + * BEGIN: ZooKeeper tests. public void testClearZKNodes() throws IOException, + * KeeperException, InterruptedException { // Clear any existing znode with + * the same path as bspRoot. bspCluster.getBSPMaster().clearZKNodes(); int + * timeout = configuration.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000); + * String connectStr = QuorumPeer.getZKQuorumServersString(configuration); + * String bspRoot = configuration.get(Constants.ZOOKEEPER_ROOT, + * Constants.DEFAULT_ZOOKEEPER_ROOT); // Establishing a zk session. ZooKeeper + * zk = new ZooKeeper(connectStr, timeout, new Watcher() { + * @Override public void process(WatchedEvent event) { // Do nothing.(Dummy + * Watcher) } }); // Creating dummy bspRoot if it doesn't already exist. Stat + * s = zk.exists(bspRoot, false); if (s == null) { zk.create(bspRoot, new + * byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // Creating dummy + * child nodes at depth 1. String node1 = bspRoot + "/task1"; String node2 = + * bspRoot + "/task2"; zk.create(node1, new byte[0], Ids.OPEN_ACL_UNSAFE, + * CreateMode.PERSISTENT); zk.create(node2, new byte[0], Ids.OPEN_ACL_UNSAFE, + * CreateMode.PERSISTENT); // Creating dummy child node at depth 2. String + * node11 = node1 + "/superstep1"; zk.create(node11, new byte[0], + * Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); ArrayList<String> list = + * (ArrayList<String>) zk.getChildren(bspRoot, false); assertEquals(2, + * list.size()); System.out.println(list.size()); + * bspCluster.getBSPMaster().clearZKNodes(); list = (ArrayList<String>) + * zk.getChildren(bspRoot, false); System.out.println(list.size()); + * assertEquals(0, list.size()); try { zk.getData(node11, false, null); + * fail(); } catch (KeeperException.NoNodeException e) { + * System.out.println("Node has been removed correctly!"); } } */ /* Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java?rev=1340371&r1=1340370&r2=1340371&view=diff ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (original) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java Sat May 19 08:32:28 2012 @@ -54,6 +54,7 @@ import org.apache.hama.bsp.sync.SyncClie import org.apache.hama.bsp.sync.SyncException; import org.apache.hama.bsp.sync.SyncServiceFactory; import org.apache.hama.ipc.BSPPeerProtocol; +import org.apache.hama.ipc.HamaRPCProtocolVersion; import org.apache.hama.util.BSPNetUtils; public class TestBSPTaskFaults extends TestCase { @@ -97,7 +98,7 @@ public class TestBSPTaskFaults extends T @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { - return BSPPeerProtocol.versionID; + return HamaRPCProtocolVersion.versionID; } @Override @@ -176,6 +177,7 @@ public class TestBSPTaskFaults extends T job = jobConf; } + @Override @SuppressWarnings("rawtypes") public void run() { BSPTask task = new BSPTask(); @@ -213,7 +215,7 @@ public class TestBSPTaskFaults extends T testPort = port; } - private void readStream(InputStream input) throws IOException { + private static void readStream(InputStream input) throws IOException { BufferedReader reader = new BufferedReader(new InputStreamReader(input)); String line; while ((line = reader.readLine()) != null) { @@ -278,6 +280,7 @@ public class TestBSPTaskFaults extends T // We have errorLog and infoLog to prevent block on pipe between // child and parent process. errorLog = new Thread() { + @Override public void run() { try { readStream(bspTaskProcess.getErrorStream()); @@ -289,6 +292,7 @@ public class TestBSPTaskFaults extends T errorLog.start(); infoLog = new Thread() { + @Override public void run() { try { readStream(bspTaskProcess.getInputStream()); @@ -329,7 +333,7 @@ public class TestBSPTaskFaults extends T job.setOutputFormat(NullOutputFormat.class); final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy( - BSPPeerProtocol.class, BSPPeerProtocol.versionID, + BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, new InetSocketAddress("127.0.0.1", port), hamaConf); BSPTask task = new BSPTask(); @@ -339,6 +343,7 @@ public class TestBSPTaskFaults extends T + hamaConf.getInt(TEST_POINT, 0)); Runtime.getRuntime().addShutdownHook(new Thread() { + @Override public void run() { try { proto.close(); @@ -430,7 +435,7 @@ public class TestBSPTaskFaults extends T conf.setInt("bsp.groom.rpc.port", inetAddress.getPort()); umbilical = (BSPPeerProtocol) RPC.getProxy(BSPPeerProtocol.class, - BSPPeerProtocol.versionID, inetAddress, conf); + HamaRPCProtocolVersion.versionID, inetAddress, conf); LOG.info("Started the proxy connections"); this.testBSPTaskService = Executors.newScheduledThreadPool(1); Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1340371&r1=1340370&r2=1340371&view=diff ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Sat May 19 08:32:28 2012 @@ -36,6 +36,7 @@ import org.apache.hama.bsp.message.type. import org.apache.hama.bsp.sync.SyncClient; import org.apache.hama.bsp.sync.SyncServiceFactory; import org.apache.hama.ipc.BSPPeerProtocol; +import org.apache.hama.ipc.HamaRPCProtocolVersion; import org.apache.hama.util.BSPNetUtils; public class TestCheckpoint extends TestCase { @@ -101,7 +102,7 @@ public class TestCheckpoint extends Test conf.setInt("bsp.groom.rpc.port", inetAddress.getPort()); BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy( - BSPPeerProtocol.class, BSPPeerProtocol.versionID, inetAddress, conf); + BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, inetAddress, conf); LOG.info("Started the proxy connections"); TaskAttemptID tid = new TaskAttemptID(new TaskID(new BSPJobID( @@ -109,9 +110,10 @@ public class TestCheckpoint extends Test try { BSPJob job = new BSPJob(conf); - job.setOutputFormat(NullOutputFormat.class); + job.setOutputPath(TestBSPMasterGroomServer.OUTPUT_PATH); + job.setOutputFormat(TextOutputFormat.class); final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy( - BSPPeerProtocol.class, BSPPeerProtocol.versionID, + BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, new InetSocketAddress("127.0.0.1", port), conf); BSPTask task = new BSPTask(); Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java?rev=1340371&r1=1340370&r2=1340371&view=diff ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java (original) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java Sat May 19 08:32:28 2012 @@ -32,6 +32,7 @@ import org.apache.hadoop.io.DataOutputBu public class TestClusterStatus extends TestCase { Random rnd = new Random(); + @Override protected void setUp() throws Exception { super.setUp(); } Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java?rev=1340371&r1=1340370&r2=1340371&view=diff ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java (original) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java Sat May 19 08:32:28 2012 @@ -22,32 +22,20 @@ package org.apache.hama.bsp; import java.io.IOException; import java.util.ArrayList; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; import org.apache.hama.Constants; import org.apache.hama.HamaCluster; import org.apache.hama.HamaConfiguration; -import org.apache.hama.examples.ClassSerializePrinting; import org.apache.hama.zookeeper.QuorumPeer; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class TestZooKeeper extends HamaCluster { - private static Log LOG = LogFactory.getLog(TestZooKeeper.class); - private HamaConfiguration configuration; public TestZooKeeper() { @@ -63,10 +51,12 @@ public class TestZooKeeper extends HamaC .getCanonicalName()); } + @Override public void setUp() throws Exception { super.setUp(); } + @Override public void tearDown() throws Exception { super.tearDown(); } Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java?rev=1340371&r1=1340370&r2=1340371&view=diff ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java (original) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java Sat May 19 08:32:28 2012 @@ -93,7 +93,7 @@ public class TestAvroMessageManager exte } - public final BSPMessageBundle<Writable> getRandomBundle() { + public final static BSPMessageBundle<Writable> getRandomBundle() { BSPMessageBundle<Writable> bundle = new BSPMessageBundle<Writable>(); for (int i = 0; i < INT_MSG_COUNT; i++) { Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java?rev=1340371&r1=1340370&r2=1340371&view=diff ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java (original) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java Sat May 19 08:32:28 2012 @@ -55,7 +55,7 @@ public class TestHadoopMessageManager ex messagingInternal(conf); } - private void messagingInternal(Configuration conf) throws Exception { + private static void messagingInternal(Configuration conf) throws Exception { conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS, "org.apache.hama.bsp.message.HadoopMessageManagerImpl"); MessageManager<IntWritable> messageManager = MessageManagerFactory