Added: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java?rev=1628344&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java Tue Sep
30 00:44:14 2014
@@ -0,0 +1,671 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.util.ReferenceCountUtil;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * An abstract IPC service using netty. IPC calls take a single {@link
Writable}
+ * as a parameter, and return a {@link Writable}*
+ *
+ * @see AsyncServer
+ */
+public abstract class AsyncServer {
+
+ private AuthMethod authMethod;
+ static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
+ static int INITIAL_RESP_BUF_SIZE = 1024;
+ UserGroupInformation user = null;
+ // 1 : Introduce ping and server does not throw away RPCs
+ // 3 : Introduce the protocol into the RPC connection header
+ // 4 : Introduced SASL security layer
+ static final byte CURRENT_VERSION = 4;
+ static final int HEADER_LENGTH = 10;
+ // follows version is read.
+ private Configuration conf;
+ private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
+ private int backlogLength;;
+ InetSocketAddress address;
+ private static final Log LOG = LogFactory.getLog(AsyncServer.class);
+ private static int NIO_BUFFER_LIMIT = 8 * 1024;
+ private final int maxRespSize;
+ static final String IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY =
"ipc.server.max.response.size";
+ static final int IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT = 1024 * 1024;
+
+ private static final ThreadLocal<AsyncServer> SERVER = new
ThreadLocal<AsyncServer>();
+ private int port; // port we listen on
+ private Class<? extends Writable> paramClass; // class of call parameters
+ // Configure the server.(constructor is thread num)
+ private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
+ private EventLoopGroup workerGroup = new NioEventLoopGroup();
+ private static final Map<String, Class<?>> PROTOCOL_CACHE = new
ConcurrentHashMap<String, Class<?>>();
+ private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
+
+ static Class<?> getProtocolClass(String protocolName, Configuration conf)
+ throws ClassNotFoundException {
+ Class<?> protocol = PROTOCOL_CACHE.get(protocolName);
+ if (protocol == null) {
+ protocol = conf.getClassByName(protocolName);
+ PROTOCOL_CACHE.put(protocolName, protocol);
+ }
+ return protocol;
+ }
+
+ /**
+ * Getting address
+ *
+ * @return InetSocketAddress
+ */
+ public InetSocketAddress getAddress() {
+ return address;
+ }
+
+ /**
+ * Returns the server instance called under or null. May be called under
+ * {@link #call(Writable, long)} implementations, and under {@link Writable}
+ * methods of paramters and return values. Permits applications to access the
+ * server context.
+ *
+ * @return NioServer
+ */
+ public static AsyncServer get() {
+ return SERVER.get();
+ }
+
+ /**
+ * Constructs a server listening on the named port and address. Parameters
+ * passed must be of the named class. The
+ * <code>handlerCount</handlerCount> determines
+ * the number of handler threads that will be used to process calls.
+ *
+ * @param bindAddress
+ * @param port
+ * @param paramClass
+ * @param handlerCount
+ * @param conf
+ * @throws IOException
+ */
+ protected AsyncServer(String bindAddress, int port,
+ Class<? extends Writable> paramClass, int handlerCount, Configuration
conf)
+ throws IOException {
+ this(bindAddress, port, paramClass, handlerCount, conf, Integer
+ .toString(port), null);
+ }
+
+ protected AsyncServer(String bindAddress, int port,
+ Class<? extends Writable> paramClass, int handlerCount,
+ Configuration conf, String serverName) throws IOException {
+ this(bindAddress, port, paramClass, handlerCount, conf, serverName, null);
+ }
+
+ protected AsyncServer(String bindAddress, int port,
+ Class<? extends Writable> paramClass, int handlerCount,
+ Configuration conf, String serverName,
+ SecretManager<? extends TokenIdentifier> secretManager)
+ throws IOException {
+ this.conf = conf;
+ this.port = port;
+ this.address = new InetSocketAddress(bindAddress, port);
+ this.paramClass = paramClass;
+ this.maxRespSize = conf.getInt(IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
+ IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
+
+ this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", true);
+ this.backlogLength = conf.getInt("ipc.server.listen.queue.size", 100);
+ }
+
+ /** start server listener */
+ public void start() {
+ new NioServerListener().start();
+ }
+
+ private class NioServerListener extends Thread {
+
+ /**
+ * Configure and start nio server
+ */
+ @Override
+ public void run() {
+ SERVER.set(AsyncServer.this);
+ try {
+ // ServerBootstrap is a helper class that sets up a server
+ ServerBootstrap b = new ServerBootstrap();
+ b.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .option(ChannelOption.SO_BACKLOG, backlogLength)
+ .childOption(ChannelOption.MAX_MESSAGES_PER_READ, NIO_BUFFER_LIMIT)
+ .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childOption(ChannelOption.SO_RCVBUF, 30 * 1024 * 1024)
+ .childOption(ChannelOption.RCVBUF_ALLOCATOR,
+ new FixedRecvByteBufAllocator(100 * 1024))
+
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline p = ch.pipeline();
+ // Register accumulation processing handler
+ p.addLast(new NioFrameDecoder(100 * 1024 * 1024, 0, 4, 0, 0));
+ // Register message processing handler
+ p.addLast(new NioServerInboundHandler());
+ }
+ });
+
+ // Bind and start to accept incoming connections.
+ ChannelFuture f = b.bind(port).sync();
+ LOG.info("AsyncServer startup");
+ // Wait until the server socket is closed.
+ f.channel().closeFuture().sync();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ // Shut down Server gracefully
+ bossGroup.shutdownGracefully();
+ workerGroup.shutdownGracefully();
+ }
+ }
+ }
+
+ /** Stops the server gracefully. */
+ public void stop() {
+ if (bossGroup != null && !bossGroup.isTerminated()) {
+ bossGroup.shutdownGracefully();
+ }
+ if (workerGroup != null && !workerGroup.isTerminated()) {
+ workerGroup.shutdownGracefully();
+ }
+ LOG.info("AsyncServer gracefully shutdown");
+ }
+
+ /**
+ * This class dynamically accumulate the recieved data by the value of the
+ * length field in the message
+ */
+ public class NioFrameDecoder extends LengthFieldBasedFrameDecoder {
+
+ /**
+ * @param maxFrameLength - the maximum length of the frame
+ * @param lengthFieldOffset - the offset of the length field
+ * @param lengthFieldLength - the length of the length field
+ * @param lengthAdjustment - the compensation value to add to the value of
+ * the length field
+ * @param initialBytesToStrip - the number of first bytes to strip out from
+ * the decoded frame
+ */
+ public NioFrameDecoder(int maxFrameLength, int lengthFieldOffset,
+ int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
+ super(maxFrameLength, lengthFieldOffset, lengthFieldLength,
+ lengthAdjustment, initialBytesToStrip);
+ }
+
+ /**
+ * Decode(Accumulate) the from one ByteBuf to an other
+ *
+ * @param ctx
+ * @param in
+ */
+ @Override
+ protected Object decode(ChannelHandlerContext ctx, ByteBuf in)
+ throws Exception {
+ ByteBuf recvBuff = (ByteBuf) super.decode(ctx, in);
+ if (recvBuff == null) {
+ return null;
+ }
+ return recvBuff;
+ }
+ }
+
+ /**
+ * This class process received message from client and send response message.
+ */
+ private class NioServerInboundHandler extends ChannelInboundHandlerAdapter {
+ ConnectionHeader header = new ConnectionHeader();
+ Class<?> protocol;
+ private String errorClass = null;
+ private String error = null;
+ private boolean rpcHeaderRead = false; // if initial rpc header is read
+ private boolean headerRead = false; // if the connection header that
follows
+ // version is read.
+
+ /**
+ * Be invoked only one when a connection is established and ready to
+ * generate traffic
+ *
+ * @param ctx
+ */
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) {
+ SERVER.set(AsyncServer.this);
+ }
+
+ /**
+ * Process a recieved message from client. This method is called with the
+ * received message, whenever new data is received from a client.
+ *
+ * @param ctx
+ * @param cause
+ */
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ ByteBuffer dataLengthBuffer = ByteBuffer.allocate(4);
+ ByteBuf byteBuf = (ByteBuf) msg;
+
+ ByteBuffer data = null;
+ ByteBuffer rpcHeaderBuffer = null;
+ try {
+ while (true) {
+ Call call = null;
+ errorClass = null;
+ error = null;
+ try {
+ if (dataLengthBuffer.remaining() > 0 && byteBuf.isReadable()) {
+ byteBuf.readBytes(dataLengthBuffer);
+ if (dataLengthBuffer.remaining() > 0 && byteBuf.isReadable()) {
+ return;
+ }
+ } else {
+ return;
+ }
+
+ // read rpcHeader
+ if (!rpcHeaderRead) {
+ // Every connection is expected to send the header.
+ if (rpcHeaderBuffer == null) {
+ dataLengthBuffer = null;
+ dataLengthBuffer = ByteBuffer.allocate(4);
+ byteBuf.readBytes(dataLengthBuffer);
+ rpcHeaderBuffer = ByteBuffer.allocate(2);
+ }
+ byteBuf.readBytes(rpcHeaderBuffer);
+ if (!rpcHeaderBuffer.hasArray()
+ || rpcHeaderBuffer.remaining() > 0) {
+ return;
+ }
+ int version = rpcHeaderBuffer.get(0);
+ byte[] method = new byte[] { rpcHeaderBuffer.get(1) };
+ try {
+ authMethod = AuthMethod.read(new DataInputStream(
+ new ByteArrayInputStream(method)));
+ dataLengthBuffer.flip();
+ } catch (IOException ioe) {
+ errorClass = ioe.getClass().getName();
+ error = StringUtils.stringifyException(ioe);
+ }
+
+ if (!HEADER.equals(dataLengthBuffer)
+ || version != CURRENT_VERSION) {
+ LOG.warn("Incorrect header or version mismatch from "
+ + address.getHostName() + ":" + address.getPort()
+ + " got version " + version + " expected version "
+ + CURRENT_VERSION);
+ return;
+ }
+ dataLengthBuffer.clear();
+ if (authMethod == null) {
+ throw new RuntimeException(
+ "Unable to read authentication method");
+ }
+ rpcHeaderBuffer = null;
+ rpcHeaderRead = true;
+ continue;
+ }
+
+ // read data length and allocate buffer;
+ if (data == null) {
+ dataLengthBuffer.flip();
+ int dataLength = dataLengthBuffer.getInt();
+ if (dataLength < 0) {
+ LOG.warn("Unexpected data length " + dataLength + "!! from "
+ + address.getHostName());
+ }
+ data = ByteBuffer.allocate(dataLength);
+ }
+
+ // read received data
+ byteBuf.readBytes(data);
+ if (data.remaining() == 0) {
+ dataLengthBuffer.clear();
+ data.flip();
+ boolean isHeaderRead = headerRead;
+ call = processOneRpc(data.array());
+ data = null;
+ if (!isHeaderRead) {
+ continue;
+ }
+ }
+ } catch (OutOfMemoryError oome) {
+ // we can run out of memory if we have too many threads
+ // log the event and sleep for a minute and give
+ // some thread(s) a chance to finish
+ //
+ LOG.warn("Out of Memory in server select", oome);
+ try {
+ Thread.sleep(60000);
+ errorClass = oome.getClass().getName();
+ error = StringUtils.stringifyException(oome);
+ } catch (Exception ie) {
+ }
+ } catch (Exception e) {
+ LOG.warn("Exception in Responder "
+ + StringUtils.stringifyException(e));
+ errorClass = e.getClass().getName();
+ error = StringUtils.stringifyException(e);
+ }
+ sendResponse(ctx, call);
+ }
+ } finally {
+ ReferenceCountUtil.release(msg);
+ }
+ }
+
+ /**
+ * Send response data to client
+ *
+ * @param ctx
+ * @param call
+ */
+ private void sendResponse(ChannelHandlerContext ctx, Call call) {
+ ByteArrayOutputStream buf = new ByteArrayOutputStream(
+ INITIAL_RESP_BUF_SIZE);
+ Writable value = null;
+ try {
+ value = call(protocol, call.param, call.timestamp);
+ } catch (Throwable e) {
+ String logMsg = this.getClass().getName() + ", call " + call
+ + ": error: " + e;
+ if (e instanceof RuntimeException || e instanceof Error) {
+ // These exception types indicate something is probably wrong
+ // on the server side, as opposed to just a normal exceptional
+ // result.
+ LOG.warn(logMsg, e);
+ } else if (exceptionsHandler.isTerse(e.getClass())) {
+ // Don't log the whole stack trace of these exceptions.
+ // Way too noisy!
+ LOG.info(logMsg);
+ } else {
+ LOG.info(logMsg, e);
+ }
+ errorClass = e.getClass().getName();
+ error = StringUtils.stringifyException(e);
+ }
+ try {
+ setupResponse(buf, call, (error == null) ? Status.SUCCESS
+ : Status.ERROR, value, errorClass, error);
+ if (buf.size() > maxRespSize) {
+ LOG.warn("Large response size " + buf.size() + " for call "
+ + call.toString());
+ buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
+ }
+ // send response data;
+ channelWrite(ctx, call.response);
+ } catch (Exception e) {
+ LOG.info(this.getClass().getName() + " caught: "
+ + StringUtils.stringifyException(e));
+ error = null;
+ } finally {
+ IOUtils.closeStream(buf);
+ }
+ }
+
+ /**
+ * read header or data
+ *
+ * @param buf
+ * @return
+ */
+ private Call processOneRpc(byte[] buf) throws IOException {
+ if (headerRead) {
+ return processData(buf);
+ } else {
+ processHeader(buf);
+ headerRead = true;
+ return null;
+ }
+ }
+
+ /**
+ * Reads the connection header following version
+ *
+ * @param buf buffer
+ */
+ private void processHeader(byte[] buf) {
+ DataInputStream in = new DataInputStream(new ByteArrayInputStream(buf));
+ try {
+ header.readFields(in);
+ String protocolClassName = header.getProtocol();
+ if (protocolClassName != null) {
+ protocol = getProtocolClass(header.getProtocol(), conf);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ IOUtils.closeStream(in);
+ }
+
+ UserGroupInformation protocolUser = header.getUgi();
+ user = protocolUser;
+ }
+
+ /**
+ *
+ * Reads the received data, create call object;
+ *
+ * @param buf buffer to serialize the response into
+ * @return the IPC Call
+ */
+ private Call processData(byte[] buf) {
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf));
+ try {
+ int id = dis.readInt(); // try to read an id
+
+ if (LOG.isDebugEnabled())
+ LOG.debug(" got #" + id);
+ Writable param = ReflectionUtils.newInstance(paramClass, conf);
+ param.readFields(dis); // try to read param data
+
+ Call call = new Call(id, param, this);
+
+ return call;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ IOUtils.closeStream(dis);
+ }
+ }
+ }
+
+ /**
+ * Setup response for the IPC Call.
+ *
+ * @param response buffer to serialize the response into
+ * @param call {@link Call} to which we are setting up the response
+ * @param status {@link Status} of the IPC call
+ * @param rv return value for the IPC Call, if the call was successful
+ * @param errorClass error class, if the the call failed
+ * @param error error message, if the call failed
+ * @throws IOException
+ */
+ private void setupResponse(ByteArrayOutputStream response, Call call,
+ Status status, Writable rv, String errorClass, String error)
+ throws IOException {
+ response.reset();
+ DataOutputStream out = new DataOutputStream(response);
+ out.writeInt(call.id); // write call id
+ out.writeInt(status.state); // write status
+
+ if (status == Status.SUCCESS) {
+ rv.write(out);
+ } else {
+ WritableUtils.writeString(out, errorClass);
+ WritableUtils.writeString(out, error);
+ }
+ call.setResponse(ByteBuffer.wrap(response.toByteArray()));
+ IOUtils.closeStream(out);
+ }
+
+ /**
+ * This is a wrapper around {@link WritableByteChannel#write(ByteBuffer)}. If
+ * the amount of data is large, it writes to channel in smaller chunks. This
+ * is to avoid jdk from creating many direct buffers as the size of buffer
+ * increases. This also minimizes extra copies in NIO layer as a result of
+ * multiple write operations required to write a large buffer.
+ *
+ * @see WritableByteChannel#write(ByteBuffer)
+ *
+ * @param ctx
+ * @param buffer
+ */
+ private void channelWrite(ChannelHandlerContext ctx, ByteBuffer buffer) {
+ try {
+ ByteBuf buf = ctx.alloc().buffer();
+ buf.writeBytes(buffer.array());
+ ctx.writeAndFlush(buf);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+
+ /** A call queued for handling. */
+ private static class Call {
+ private int id; // the client's call id
+ private Writable param; // the parameter passed
+ private ChannelInboundHandlerAdapter connection; // connection to client
+ private long timestamp; // the time received when response is null
+ // the time served when response is not null
+ private ByteBuffer response; // the response for this call
+
+ /**
+ *
+ * @param id
+ * @param param
+ * @param connection
+ */
+ public Call(int id, Writable param, ChannelInboundHandlerAdapter
connection) {
+ this.id = id;
+ this.param = param;
+ this.connection = connection;
+ this.timestamp = System.currentTimeMillis();
+ this.response = null;
+ }
+
+ /**
+ *
+ */
+ @Override
+ public String toString() {
+ return param.toString() + " from " + connection.toString();
+ }
+
+ /**
+ *
+ * @param response
+ */
+ public void setResponse(ByteBuffer response) {
+ this.response = response;
+ }
+ }
+
+ /**
+ * ExceptionsHandler manages Exception groups for special handling e.g.,
terse
+ * exception group for concise logging messages
+ */
+ static class ExceptionsHandler {
+ private volatile Set<String> terseExceptions = new HashSet<String>();
+
+ /**
+ * Add exception class so server won't log its stack trace. Modifying the
+ * terseException through this method is thread safe.
+ *
+ * @param exceptionClass exception classes
+ */
+ void addTerseExceptions(Class<?>... exceptionClass) {
+
+ // Make a copy of terseException for performing modification
+ final HashSet<String> newSet = new HashSet<String>(terseExceptions);
+
+ // Add all class names into the HashSet
+ for (Class<?> name : exceptionClass) {
+ newSet.add(name.toString());
+ }
+ // Replace terseException set
+ terseExceptions = Collections.unmodifiableSet(newSet);
+ }
+
+ /**
+ *
+ * @param t
+ * @return
+ */
+ boolean isTerse(Class<?> t) {
+ return terseExceptions.contains(t.toString());
+ }
+ }
+
+ /**
+ * Called for each call.
+ *
+ * @param protocol
+ * @param param
+ * @param receiveTime
+ * @return Writable
+ * @throws IOException
+ */
+ public abstract Writable call(Class<?> protocol, Writable param,
+ long receiveTime) throws IOException;
+}
Propchange: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java?rev=1628344&view=auto
==============================================================================
---
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
(added)
+++
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
Tue Sep 30 00:44:14 2014
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.BSPPeerImpl;
+import org.apache.hama.bsp.Counters;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.message.queue.DiskQueue;
+import org.apache.hama.bsp.message.queue.MemoryQueue;
+import org.apache.hama.bsp.message.queue.MessageQueue;
+import org.apache.hama.util.BSPNetUtils;
+
+public class TestHamaAsyncMessageManager extends TestCase {
+
+ public static final String TMP_OUTPUT_PATH = "/tmp/messageQueue";
+ // increment is here to solve race conditions in parallel execution to choose
+ // other ports.
+ public static volatile int increment = 1;
+
+ public void testMemoryMessaging() throws Exception {
+ HamaConfiguration conf = new HamaConfiguration();
+ conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
+ MessageQueue.class);
+ conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
+ messagingInternal(conf);
+ }
+
+ public void testDiskMessaging() throws Exception {
+ HamaConfiguration conf = new HamaConfiguration();
+ conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
+ conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, DiskQueue.class,
+ MessageQueue.class);
+ messagingInternal(conf);
+ }
+
+ private static void messagingInternal(HamaConfiguration conf)
+ throws Exception {
+ conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS,
+ "org.apache.hama.bsp.message.HamaAsyncMessageManagerImpl");
+ MessageManager<IntWritable> messageManager = MessageManagerFactory
+ .getMessageManager(conf);
+
+ assertTrue(messageManager instanceof HamaAsyncMessageManagerImpl);
+
+ InetSocketAddress peer = new InetSocketAddress(
+ BSPNetUtils.getCanonicalHostname(), BSPNetUtils.getFreePort()
+ + (increment++));
+ conf.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
+ conf.setInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+
+ BSPPeer<?, ?, ?, ?, IntWritable> dummyPeer = new BSPPeerImpl<NullWritable,
NullWritable, NullWritable, NullWritable, IntWritable>(
+ conf, FileSystem.get(conf), new Counters());
+ TaskAttemptID id = new TaskAttemptID("1", 1, 1, 1);
+ messageManager.init(id, dummyPeer, conf, peer);
+ peer = messageManager.getListenerAddress();
+ String peerName = peer.getHostName() + ":" + peer.getPort();
+ System.out.println("Peer is " + peerName);
+ messageManager.send(peerName, new IntWritable(1337));
+
+ Iterator<Entry<InetSocketAddress, BSPMessageBundle<IntWritable>>>
messageIterator = messageManager
+ .getOutgoingBundles();
+
+ Entry<InetSocketAddress, BSPMessageBundle<IntWritable>> entry =
messageIterator
+ .next();
+
+ assertEquals(entry.getKey(), peer);
+
+ assertTrue(entry.getValue().size() == 1);
+
+ BSPMessageBundle<IntWritable> bundle = new BSPMessageBundle<IntWritable>();
+ Iterator<IntWritable> it = entry.getValue().iterator();
+ while (it.hasNext()) {
+ bundle.addMessage(it.next());
+ }
+
+ messageManager.transfer(peer, bundle);
+
+ messageManager.clearOutgoingMessages();
+
+ assertTrue(messageManager.getNumCurrentMessages() == 1);
+ IntWritable currentMessage = messageManager.getCurrentMessage();
+
+ assertEquals(currentMessage.get(), 1337);
+ messageManager.close();
+ }
+}
Propchange:
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncIPC.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncIPC.java?rev=1628344&view=auto
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncIPC.java (added)
+++ hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncIPC.java Tue Sep
30 00:44:14 2014
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringUtils;
+
+public class TestAsyncIPC extends TestCase {
+
+ public static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
+
+ final private static Configuration conf = new Configuration();
+ final static private int PING_INTERVAL = 1000;
+
+ static {
+ Client.setPingInterval(conf, PING_INTERVAL);
+ }
+
+ public TestAsyncIPC(String name) {
+ super(name);
+ }
+
+ private static final Random RANDOM = new Random();
+
+ private static final String ADDRESS = "0.0.0.0";
+ private static int port = 7000;
+
+ private static class TestServer extends AsyncServer {
+ private boolean sleep;
+
+ public TestServer(int handlerCount, boolean sleep) throws IOException {
+ super(ADDRESS, port++, LongWritable.class, handlerCount, conf);
+ this.sleep = sleep;
+ }
+
+ @Override
+ public Writable call(Class<?> protocol, Writable param, long receiveTime)
+ throws IOException {
+ if (sleep) {
+ try {
+ Thread.sleep(RANDOM.nextInt(2 * PING_INTERVAL)); // sleep a bit
+ } catch (InterruptedException e) {
+ }
+ }
+ return param; // echo param as result
+ }
+ }
+
+ private static class SerialCaller extends Thread {
+ private AsyncClient client;
+ private InetSocketAddress serverAddress;
+ private int count;
+ private boolean failed;
+
+ public SerialCaller(AsyncClient client, InetSocketAddress server, int
count) {
+ this.client = client;
+ this.serverAddress = server;
+ this.count = count;
+ }
+
+ @Override
+ public void run() {
+ for (int i = 0; i < count; i++) {
+ try {
+ LongWritable param = new LongWritable(RANDOM.nextLong());
+
+ LongWritable value = (LongWritable) client.call(param, serverAddress,
+ null, null, 0, conf);
+ if (!param.equals(value)) {
+ LOG.fatal("Call failed!");
+ failed = true;
+ break;
+ }
+
+ } catch (Exception e) {
+ LOG.fatal("Caught: " + StringUtils.stringifyException(e));
+ failed = true;
+ }
+ }
+ }
+ }
+
+ private static class ParallelCaller extends Thread {
+ private AsyncClient client;
+ private int count;
+ private InetSocketAddress[] addresses;
+ private boolean failed;
+
+ public ParallelCaller(AsyncClient client, InetSocketAddress[] addresses,
+ int count) {
+ this.client = client;
+ this.addresses = addresses;
+ this.count = count;
+ }
+
+ @Override
+ public void run() {
+ for (int i = 0; i < count; i++) {
+ try {
+ Writable[] params = new Writable[addresses.length];
+ for (int j = 0; j < addresses.length; j++)
+ params[j] = new LongWritable(RANDOM.nextLong());
+ Writable[] values = client.call(params, addresses, null, null, conf);
+
+ for (int j = 0; j < addresses.length; j++) {
+ if (!params[j].equals(values[j])) {
+ LOG.fatal("Call failed!");
+ failed = true;
+ break;
+ }
+ }
+ } catch (Exception e) {
+ LOG.fatal("Caught: " + StringUtils.stringifyException(e));
+ failed = true;
+ }
+ }
+ }
+ }
+
+ public void testSerial() throws Exception {
+ testSerial(3, false, 2, 5, 100);
+ }
+
+ public void testSerial(int handlerCount, boolean handlerSleep,
+ int clientCount, int callerCount, int callCount) throws Exception {
+ AsyncServer server = new TestServer(handlerCount, handlerSleep);
+ InetSocketAddress addr = server.getAddress();
+ server.start();
+
+ AsyncClient[] clients = new AsyncClient[clientCount];
+ for (int i = 0; i < clientCount; i++) {
+ clients[i] = new AsyncClient(LongWritable.class, conf);
+ }
+
+ SerialCaller[] callers = new SerialCaller[callerCount];
+ for (int i = 0; i < callerCount; i++) {
+ callers[i] = new SerialCaller(clients[i % clientCount], addr, callCount);
+ callers[i].start();
+ }
+ for (int i = 0; i < callerCount; i++) {
+ callers[i].join();
+ assertFalse(callers[i].failed);
+ }
+ for (int i = 0; i < clientCount; i++) {
+ clients[i].stop();
+ }
+ server.stop();
+ }
+
+ public void testParallel() throws Exception {
+ testParallel(10, false, 2, 4, 2, 4, 100);
+ }
+
+ public void testParallel(int handlerCount, boolean handlerSleep,
+ int serverCount, int addressCount, int clientCount, int callerCount,
+ int callCount) throws Exception {
+ AsyncServer[] servers = new AsyncServer[serverCount];
+ for (int i = 0; i < serverCount; i++) {
+ servers[i] = new TestServer(handlerCount, handlerSleep);
+ servers[i].start();
+ }
+
+ InetSocketAddress[] addresses = new InetSocketAddress[addressCount];
+ for (int i = 0; i < addressCount; i++) {
+ addresses[i] = servers[i % serverCount].address;
+ }
+
+ AsyncClient[] clients = new AsyncClient[clientCount];
+ for (int i = 0; i < clientCount; i++) {
+ clients[i] = new AsyncClient(LongWritable.class, conf);
+ }
+
+ ParallelCaller[] callers = new ParallelCaller[callerCount];
+ for (int i = 0; i < callerCount; i++) {
+ callers[i] = new ParallelCaller(clients[i % clientCount], addresses,
+ callCount);
+ callers[i].start();
+ }
+ for (int i = 0; i < callerCount; i++) {
+ callers[i].join();
+ assertFalse(callers[i].failed);
+ }
+ for (int i = 0; i < clientCount; i++) {
+ clients[i].stop();
+ }
+ for (int i = 0; i < serverCount; i++) {
+ servers[i].stop();
+ }
+ }
+}
Propchange: hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncIPC.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncIPC.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncRPC.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncRPC.java?rev=1628344&view=auto
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncRPC.java (added)
+++ hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncRPC.java Tue Sep
30 00:44:14 2014
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+public class TestAsyncRPC extends TestCase {
+ private static final int PORT = 1234;
+ private static final String ADDRESS = "0.0.0.0";
+
+ public static final Log LOG = LogFactory
+ .getLog("org.apache.hama.ipc.TestRPCWithNetty");
+
+ private static Configuration conf = new Configuration();
+
+ public TestAsyncRPC(String name) {
+ super(name);
+ }
+
+ public interface TestProtocol extends VersionedProtocol {
+ public static final long versionID = 1L;
+
+ void ping() throws IOException;
+
+ String echo(String value) throws IOException;
+
+ String[] echo(String[] value) throws IOException;
+
+ Writable echo(Writable value) throws IOException;
+
+ int add(int v1, int v2) throws IOException;
+
+ int add(int[] values) throws IOException;
+
+ int error() throws IOException;
+
+ void testServerGet() throws IOException;
+ }
+
+ public class TestImpl implements TestProtocol {
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion) {
+ return TestProtocol.versionID;
+ }
+
+ @Override
+ public void ping() {
+ }
+
+ @Override
+ public String echo(String value) throws IOException {
+ return value;
+ }
+
+ @Override
+ public String[] echo(String[] values) throws IOException {
+ return values;
+ }
+
+ @Override
+ public Writable echo(Writable writable) {
+ return writable;
+ }
+
+ @Override
+ public int add(int v1, int v2) {
+ return v1 + v2;
+ }
+
+ @Override
+ public int add(int[] values) {
+ int sum = 0;
+ for (int i = 0; i < values.length; i++) {
+ sum += values[i];
+ }
+ return sum;
+ }
+
+ @Override
+ public int error() throws IOException {
+ throw new IOException("bobo");
+ }
+
+ @Override
+ public void testServerGet() throws IOException {
+ AsyncServer server = AsyncServer.get();
+ if (!(server instanceof AsyncRPC.NioServer)) {
+ throw new IOException("ServerWithNetty.get() failed");
+ }
+ }
+ }
+
+ public void testCalls() throws Exception {
+ AsyncServer server = AsyncRPC
+ .getServer(new TestImpl(), ADDRESS, PORT, conf);
+ server.start();
+
+ InetSocketAddress addr = new InetSocketAddress(PORT);
+ TestProtocol proxy = (TestProtocol) AsyncRPC.getProxy(TestProtocol.class,
+ TestProtocol.versionID, addr, conf);
+
+ proxy.ping();
+
+ String stringResult = proxy.echo("foo");
+ assertEquals(stringResult, "foo");
+
+ stringResult = proxy.echo((String) null);
+ assertEquals(stringResult, null);
+
+ String[] stringResults = proxy.echo(new String[] { "foo", "bar" });
+ assertTrue(Arrays.equals(stringResults, new String[] { "foo", "bar" }));
+
+ stringResults = proxy.echo((String[]) null);
+ assertTrue(Arrays.equals(stringResults, null));
+
+ int intResult = proxy.add(1, 2);
+ assertEquals(intResult, 3);
+
+ intResult = proxy.add(new int[] { 1, 2 });
+ assertEquals(intResult, 3);
+
+ boolean caught = false;
+ try {
+ proxy.error();
+ } catch (Exception e) {
+ LOG.debug("Caught " + e);
+ caught = true;
+ }
+ assertTrue(caught);
+
+ proxy.testServerGet();
+
+ // try some multi-calls
+ Method echo = TestProtocol.class.getMethod("echo",
+ new Class[] { String.class });
+ String[] strings = (String[]) AsyncRPC.call(echo, new String[][] { { "a" },
+ { "b" } }, new InetSocketAddress[] { addr, addr }, null, conf);
+ assertTrue(Arrays.equals(strings, new String[] { "a", "b" }));
+
+ Method ping = TestProtocol.class.getMethod("ping", new Class[] {});
+ Object[] voids = AsyncRPC.call(ping, new Object[][] { {}, {} },
+ new InetSocketAddress[] { addr, addr }, null, conf);
+ assertEquals(voids, null);
+
+ server.stop();
+ }
+
+ public static void main(String[] args) throws Exception {
+ new TestAsyncRPC("test").testCalls();
+ }
+
+}
Propchange: hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncRPC.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncRPC.java
------------------------------------------------------------------------------
svn:mime-type = text/plain