Author: edwardyoon Date: Wed Feb 8 12:29:23 2012 New Revision: 1241883 URL: http://svn.apache.org/viewvc?rev=1241883&view=rev Log: Add Avro RPC.
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java Modified: incubator/hama/trunk/CHANGES.txt incubator/hama/trunk/conf/hama-default.xml incubator/hama/trunk/core/pom.xml incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java incubator/hama/trunk/pom.xml Modified: incubator/hama/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1241883&r1=1241882&r2=1241883&view=diff ============================================================================== --- incubator/hama/trunk/CHANGES.txt (original) +++ incubator/hama/trunk/CHANGES.txt Wed Feb 8 12:29:23 2012 @@ -3,9 +3,10 @@ Hama Change Log Release 0.5 - Unreleased NEW FEATURES - + + HAMA-501: Add Avro RPC (tjungblut) HAMA-456: Add basic Graph interfaces and GraphJobRunner (edwardyoon) - + BUG FIXES IMPROVEMENTS Modified: incubator/hama/trunk/conf/hama-default.xml URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-default.xml?rev=1241883&r1=1241882&r2=1241883&view=diff ============================================================================== --- incubator/hama/trunk/conf/hama-default.xml (original) +++ incubator/hama/trunk/conf/hama-default.xml Wed Feb 8 12:29:23 2012 @@ -156,6 +156,11 @@ </property> <property> + <name>hama.messanger.class</name> + <value>org.apache.hama.bsp.message.AvroMessageManagerImpl</value> + </property> + + <property> <name>hama.zookeeper.quorum</name> <value>localhost</value> <description>Comma separated list of servers in the ZooKeeper quorum. Modified: incubator/hama/trunk/core/pom.xml URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/pom.xml?rev=1241883&r1=1241882&r2=1241883&view=diff ============================================================================== --- incubator/hama/trunk/core/pom.xml (original) +++ incubator/hama/trunk/core/pom.xml Wed Feb 8 12:29:23 2012 @@ -91,11 +91,30 @@ <artifactId>hadoop-test</artifactId> </dependency> <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-ipc</artifactId> + </dependency> + <dependency> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> - <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java?rev=1241883&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java Wed Feb 8 12:29:23 2012 @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message; + +import java.nio.ByteBuffer; + +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; + +public final class AvroBSPMessageBundle extends SpecificRecordBase implements + SpecificRecord { + public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema + .parse("{\"type\":\"record\",\"name\":\"AvroBSPMessage\",\"namespace\":\"de.jungblut.avro\",\"fields\":[{\"name\":\"data\",\"type\":\"bytes\"}]}"); + @Deprecated + public java.nio.ByteBuffer data; + + public final org.apache.avro.Schema getSchema() { + return SCHEMA$; + } + + // Used by DatumWriter. Applications should not call. + public final java.lang.Object get(int field$) { + switch (field$) { + case 0: + return data; + default: + throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + // Used by DatumReader. Applications should not call. + public final void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: + data = (java.nio.ByteBuffer) value$; + break; + default: + throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + /** + * Gets the value of the 'data' field. + */ + public final java.nio.ByteBuffer getData() { + return data; + } + + /** + * Sets the value of the 'data' field. + * + * @param value the value to set. + */ + public final void setData(java.nio.ByteBuffer value) { + this.data = value; + } + + /** Creates a new AvroBSPMessage RecordBuilder */ + public final static AvroBSPMessageBundle.Builder newBuilder() { + return new AvroBSPMessageBundle.Builder(); + } + + /** Creates a new AvroBSPMessage RecordBuilder by copying an existing Builder */ + public final static AvroBSPMessageBundle.Builder newBuilder( + AvroBSPMessageBundle.Builder other) { + return new AvroBSPMessageBundle.Builder(other); + } + + /** + * Creates a new AvroBSPMessage RecordBuilder by copying an existing + * AvroBSPMessage instance + */ + public final static AvroBSPMessageBundle.Builder newBuilder( + AvroBSPMessageBundle other) { + return new AvroBSPMessageBundle.Builder(other); + } + + /** + * RecordBuilder for AvroBSPMessage instances. + */ + public final static class Builder extends + org.apache.avro.specific.SpecificRecordBuilderBase<AvroBSPMessageBundle> + implements org.apache.avro.data.RecordBuilder<AvroBSPMessageBundle> { + + private java.nio.ByteBuffer data; + + /** Creates a new Builder */ + private Builder() { + super(AvroBSPMessageBundle.SCHEMA$); + } + + /** Creates a Builder by copying an existing Builder */ + private Builder(AvroBSPMessageBundle.Builder other) { + super(other); + } + + /** Creates a Builder by copying an existing AvroBSPMessage instance */ + private Builder(AvroBSPMessageBundle other) { + super(AvroBSPMessageBundle.SCHEMA$); + if (isValidValue(fields[0], other.data)) { + data = (java.nio.ByteBuffer) clone(other.data); + fieldSetFlags[0] = true; + } + } + + public final ByteBuffer clone(ByteBuffer original) { + ByteBuffer clone = ByteBuffer.allocate(original.capacity()); + original.rewind(); + clone.put(original); + original.rewind(); + clone.flip(); + return clone; + } + + /** Gets the value of the 'data' field */ + public final java.nio.ByteBuffer getData() { + return data; + } + + /** Sets the value of the 'data' field */ + public final AvroBSPMessageBundle.Builder setData(java.nio.ByteBuffer value) { + validate(fields[0], value); + this.data = value; + fieldSetFlags[0] = true; + return this; + } + + /** Checks whether the 'data' field has been set */ + public final boolean hasData() { + return fieldSetFlags[0]; + } + + /** Clears the value of the 'data' field */ + public final AvroBSPMessageBundle.Builder clearData() { + data = null; + fieldSetFlags[0] = false; + return this; + } + + @Override + public final AvroBSPMessageBundle build() { + try { + AvroBSPMessageBundle record = new AvroBSPMessageBundle(); + record.data = fieldSetFlags[0] ? this.data + : (java.nio.ByteBuffer) getDefaultValue(fields[0]); + return record; + } catch (Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } +} Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java?rev=1241883&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java Wed Feb 8 12:29:23 2012 @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.Deque; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.avro.AvroRemoteException; +import org.apache.avro.ipc.NettyServer; +import org.apache.avro.ipc.NettyTransceiver; +import org.apache.avro.ipc.specific.SpecificRequestor; +import org.apache.avro.ipc.specific.SpecificResponder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hama.bsp.BSPMessage; +import org.apache.hama.bsp.BSPMessageBundle; +import org.apache.hama.bsp.message.MessageManager; +import org.apache.hama.util.BSPNetUtils; + +public class AvroMessageManagerImpl implements MessageManager, Sender { + + private static final Log LOG = LogFactory + .getLog(AvroMessageManagerImpl.class); + + private NettyServer server = null; + + private final HashMap<InetSocketAddress, Sender> peers = new HashMap<InetSocketAddress, Sender>(); + private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>(); + + private final HashMap<InetSocketAddress, LinkedList<BSPMessage>> outgoingQueues = new HashMap<InetSocketAddress, LinkedList<BSPMessage>>(); + private Deque<BSPMessage> localQueue = new LinkedList<BSPMessage>(); + // this must be a synchronized implementation: this is accessed per RPC + private final ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>(); + + @Override + public void init(Configuration conf, InetSocketAddress addr) { + server = new NettyServer(new SpecificResponder(Sender.class, this), addr); + } + + @Override + public void close() { + server.close(); + } + + @Override + public void clearOutgoingQueues() { + this.outgoingQueues.clear(); + localQueue.addAll(localQueueForNextIteration); + localQueueForNextIteration.clear(); + } + + public void put(BSPMessageBundle messages) { + for (BSPMessage message : messages.getMessages()) { + this.localQueueForNextIteration.add(message); + } + } + + @Override + public int getNumCurrentMessages() { + return localQueue.size(); + } + + @Override + public void transfer(InetSocketAddress addr, BSPMessageBundle bundle) + throws IOException { + AvroBSPMessageBundle msg = new AvroBSPMessageBundle(); + msg.setData(serializeMessage(bundle)); + Sender sender = peers.get(addr); + + if (sender == null) { + NettyTransceiver client = new NettyTransceiver(addr); + sender = (Sender) SpecificRequestor.getClient(Sender.class, client); + peers.put(addr, sender); + } + + sender.transfer(msg); + } + + @Override + public Void transfer(AvroBSPMessageBundle messagebundle) + throws AvroRemoteException { + try { + BSPMessageBundle deserializeMessage = deserializeMessage(messagebundle + .getData()); + this.put(deserializeMessage); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + @Override + public BSPMessage getCurrentMessage() throws IOException { + return localQueue.poll(); + } + + @Override + public void send(String peerName, BSPMessage msg) throws IOException { + LOG.debug("Send message (" + msg.toString() + ") to " + peerName); + InetSocketAddress targetPeerAddress = null; + // Get socket for target peer. + if (peerSocketCache.containsKey(peerName)) { + targetPeerAddress = peerSocketCache.get(peerName); + } else { + targetPeerAddress = BSPNetUtils.getAddress(peerName); + peerSocketCache.put(peerName, targetPeerAddress); + } + LinkedList<BSPMessage> queue = outgoingQueues.get(targetPeerAddress); + if (queue == null) { + queue = new LinkedList<BSPMessage>(); + } + queue.add(msg); + outgoingQueues.put(targetPeerAddress, queue); + } + + private static final BSPMessageBundle deserializeMessage(ByteBuffer buffer) + throws IOException { + BSPMessageBundle msg = new BSPMessageBundle(); + + ByteArrayInputStream inArray = new ByteArrayInputStream(buffer.array()); + DataInputStream in = new DataInputStream(inArray); + msg.readFields(in); + + return msg; + } + + private static final ByteBuffer serializeMessage(BSPMessageBundle msg) + throws IOException { + ByteArrayOutputStream outArray = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(outArray); + msg.write(out); + out.close(); + System.out.println("serialized " + outArray.size() + " bytes"); + return ByteBuffer.wrap(outArray.toByteArray()); + } + + @Override + public Iterator<Entry<InetSocketAddress, LinkedList<BSPMessage>>> getMessageIterator() { + return this.outgoingQueues.entrySet().iterator(); + } +} Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java?rev=1241883&r1=1241882&r2=1241883&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java Wed Feb 8 12:29:23 2012 @@ -34,7 +34,7 @@ public class MessageManagerFactory { throws ClassNotFoundException { return (MessageManager) ReflectionUtils.newInstance(conf .getClassByName(conf.get(MESSAGE_MANAGER_CLASS, - org.apache.hama.bsp.message.HadoopMessageManagerImpl.class + org.apache.hama.bsp.message.AvroMessageManagerImpl.class .getCanonicalName())), conf); } Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java?rev=1241883&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java Wed Feb 8 12:29:23 2012 @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message; + +public interface Sender { + public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol + .parse("{\"protocol\":\"Sender\",\"namespace\":\"de.jungblut.avro\",\"types\":[{\"type\":\"record\",\"name\":\"AvroBSPMessageBundle\",\"fields\":[{\"name\":\"data\",\"type\":\"bytes\"}]}],\"messages\":{\"transfer\":{\"request\":[{\"name\":\"messagebundle\",\"type\":\"AvroBSPMessageBundle\"}],\"response\":\"null\"}}}"); + + java.lang.Void transfer(AvroBSPMessageBundle messagebundle) + throws org.apache.avro.AvroRemoteException; + + @SuppressWarnings("all") + public interface Callback extends Sender { + public static final org.apache.avro.Protocol PROTOCOL = Sender.PROTOCOL; + + void transfer(AvroBSPMessageBundle messagebundle, + org.apache.avro.ipc.Callback<java.lang.Void> callback) + throws java.io.IOException; + } +} Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java?rev=1241883&view=auto ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java (added) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java Wed Feb 8 12:29:23 2012 @@ -0,0 +1,151 @@ +package org.apache.hama.bsp.message; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.Random; + +import org.apache.avro.AvroRemoteException; +import org.apache.avro.ipc.NettyServer; +import org.apache.avro.ipc.NettyTransceiver; +import org.apache.avro.ipc.specific.SpecificRequestor; +import org.apache.avro.ipc.specific.SpecificResponder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hama.bsp.BSPMessageBundle; +import org.apache.hama.bsp.BooleanMessage; +import org.apache.hama.bsp.DoubleMessage; +import org.apache.hama.bsp.IntegerMessage; + +public class TestAvroMessageManager { + + private static NettyServer server; + private static Server hadoopServer; + private static long start; + + public static final class MessageSender implements Sender { + + @Override + public Void transfer(AvroBSPMessageBundle messagebundle) + throws AvroRemoteException { + try { + BSPMessageBundle msg = deserializeMessage(messagebundle.data); + System.out.println("Received message in " + + (System.currentTimeMillis() - start) + "ms"); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + } + + private static final BSPMessageBundle deserializeMessage(ByteBuffer buffer) + throws IOException { + BSPMessageBundle msg = new BSPMessageBundle(); + + ByteArrayInputStream inArray = new ByteArrayInputStream(buffer.array()); + DataInputStream in = new DataInputStream(inArray); + msg.readFields(in); + + return msg; + } + + private static final ByteBuffer serializeMessage(BSPMessageBundle msg) + throws IOException { + ByteArrayOutputStream outArray = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(outArray); + msg.write(out); + out.close(); + System.out.println("serialized " + outArray.size() + " bytes"); + return ByteBuffer.wrap(outArray.toByteArray()); + } + + public static final BSPMessageBundle getRandomBundle() { + BSPMessageBundle bundle = new BSPMessageBundle(); + + for (int i = 0; i < 500000; i++) { + bundle.addMessage(new IntegerMessage("test", i)); + } + + for (int i = 0; i < 10000; i++) { + bundle.addMessage(new BooleanMessage("test123", i % 2 == 0)); + } + + Random r = new Random(); + for (int i = 0; i < 400000; i++) { + bundle.addMessage(new DoubleMessage("123123asd", r.nextDouble())); + } + + return bundle; + } + + public static final void main(String[] args) throws IOException { + BSPMessageBundle randomBundle = getRandomBundle(); + testAvro(randomBundle); + testHadoop(randomBundle); + } + + private static final void testAvro(BSPMessageBundle bundle) + throws IOException, AvroRemoteException { + + server = new NettyServer(new SpecificResponder(Sender.class, + new MessageSender()), new InetSocketAddress(13530)); + + NettyTransceiver client = new NettyTransceiver(new InetSocketAddress(server + .getPort())); + Sender proxy = (Sender) SpecificRequestor.getClient(Sender.class, client); + + AvroBSPMessageBundle msg = new AvroBSPMessageBundle(); + + msg.setData(serializeMessage(bundle)); + + start = System.currentTimeMillis(); + proxy.transfer(msg); + + server.close(); + client.close(); + } + + private static interface RPCTestInterface extends VersionedProtocol { + + public void transfer(BSPMessageBundle bundle); + + } + + private static class HadoopRPCInstance implements RPCTestInterface { + + @Override + public long getProtocolVersion(String arg0, long arg1) throws IOException { + return 0; + } + + @Override + public void transfer(BSPMessageBundle bundle) { + System.out.println("Received message in " + + (System.currentTimeMillis() - start) + "ms"); + } + + } + + private static final void testHadoop(BSPMessageBundle bundle) + throws IOException { + Configuration conf = new Configuration(); + HadoopRPCInstance hadoopRPCInstance = new HadoopRPCInstance(); + hadoopServer = new Server(hadoopRPCInstance, conf, new InetSocketAddress( + 13612).getHostName(), 13612); + hadoopServer.start(); + RPCTestInterface proxy = (RPCTestInterface) RPC.getProxy( + RPCTestInterface.class, 0, new InetSocketAddress(13612), conf); + start = System.currentTimeMillis(); + proxy.transfer(bundle); + hadoopServer.stop(); + } + +} Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java?rev=1241883&r1=1241882&r2=1241883&view=diff ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java (original) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java Wed Feb 8 12:29:23 2012 @@ -34,6 +34,7 @@ public class TestHadoopMessageManager ex public void testMessaging() throws Exception { Configuration conf = new Configuration(); + conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS, "org.apache.hama.bsp.message.HadoopMessageManagerImpl"); MessageManager messageManager = MessageManagerFactory .getMessageManager(conf); Modified: incubator/hama/trunk/pom.xml URL: http://svn.apache.org/viewvc/incubator/hama/trunk/pom.xml?rev=1241883&r1=1241882&r2=1241883&view=diff ============================================================================== --- incubator/hama/trunk/pom.xml (original) +++ incubator/hama/trunk/pom.xml Wed Feb 8 12:29:23 2012 @@ -185,6 +185,32 @@ <artifactId>zookeeper</artifactId> <version>${zookeeper.version}</version> </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>1.6.0</version> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-ipc</artifactId> + <version>1.6.0</version> + </dependency> + <dependency> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + <version>3.2.6.Final</version> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + <version>1.9.2</version> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>1.9.2</version> + </dependency> + </dependencies> </dependencyManagement>