Author: arp Date: Fri Nov 22 20:51:06 2013 New Revision: 1544672 URL: http://svn.apache.org/r1544672 Log: Merging r1544304 through r1544665 from trunk to branch HDFS-2832
Added: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/ - copied from r1544665, hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/ Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java?rev=1544672&r1=1544671&r2=1544672&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java Fri Nov 22 20:51:06 2013 @@ -17,42 +17,111 @@ */ package org.apache.hadoop.portmap; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.oncrpc.RpcProgram; -import org.apache.hadoop.oncrpc.SimpleTcpServer; -import org.apache.hadoop.oncrpc.SimpleUdpServer; +import org.apache.hadoop.oncrpc.RpcUtil; import org.apache.hadoop.util.StringUtils; +import org.jboss.netty.bootstrap.ConnectionlessBootstrap; +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.group.ChannelGroup; +import org.jboss.netty.channel.group.DefaultChannelGroup; +import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; +import org.jboss.netty.handler.timeout.IdleStateHandler; +import org.jboss.netty.util.HashedWheelTimer; + +import com.google.common.annotations.VisibleForTesting; /** * Portmap service for binding RPC protocols. See RFC 1833 for details. */ -public class Portmap { - public static final Log LOG = LogFactory.getLog(Portmap.class); - - private static void startUDPServer(RpcProgramPortmap rpcProgram) { - rpcProgram.register(PortmapMapping.TRANSPORT_UDP, RpcProgram.RPCB_PORT); - SimpleUdpServer udpServer = new SimpleUdpServer(RpcProgram.RPCB_PORT, - rpcProgram, 1); - udpServer.run(); - } +final class Portmap { + private static final Log LOG = LogFactory.getLog(Portmap.class); + private static final int DEFAULT_IDLE_TIME_MILLISECONDS = 5000; - private static void startTCPServer(final RpcProgramPortmap rpcProgram) { - rpcProgram.register(PortmapMapping.TRANSPORT_TCP, RpcProgram.RPCB_PORT); - SimpleTcpServer tcpServer = new SimpleTcpServer(RpcProgram.RPCB_PORT, - rpcProgram, 1); - tcpServer.run(); - } + private ConnectionlessBootstrap udpServer; + private ServerBootstrap tcpServer; + private ChannelGroup allChannels = new DefaultChannelGroup(); + private Channel udpChannel; + private Channel tcpChannel; + private final RpcProgramPortmap handler = new RpcProgramPortmap(allChannels); public static void main(String[] args) { StringUtils.startupShutdownMessage(Portmap.class, args, LOG); - RpcProgramPortmap program = new RpcProgramPortmap(); + + final int port = RpcProgram.RPCB_PORT; + Portmap pm = new Portmap(); try { - startUDPServer(program); - startTCPServer(program); + pm.start(DEFAULT_IDLE_TIME_MILLISECONDS, + new InetSocketAddress(port), new InetSocketAddress(port)); } catch (Throwable e) { - LOG.fatal("Start server failure"); + LOG.fatal("Failed to start the server. Cause:" + e.getMessage()); + pm.shutdown(); System.exit(-1); } } + + void shutdown() { + allChannels.close().awaitUninterruptibly(); + tcpServer.releaseExternalResources(); + udpServer.releaseExternalResources(); + } + + @VisibleForTesting + SocketAddress getTcpServerLocalAddress() { + return tcpChannel.getLocalAddress(); + } + + @VisibleForTesting + SocketAddress getUdpServerLoAddress() { + return udpChannel.getLocalAddress(); + } + + @VisibleForTesting + RpcProgramPortmap getHandler() { + return handler; + } + + void start(final int idleTimeMilliSeconds, final SocketAddress tcpAddress, + final SocketAddress udpAddress) { + + tcpServer = new ServerBootstrap(new NioServerSocketChannelFactory( + Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); + tcpServer.setPipelineFactory(new ChannelPipelineFactory() { + private final HashedWheelTimer timer = new HashedWheelTimer(); + private final IdleStateHandler idleStateHandler = new IdleStateHandler( + timer, 0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS); + + @Override + public ChannelPipeline getPipeline() throws Exception { + return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(), + RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler, + RpcUtil.STAGE_RPC_TCP_RESPONSE); + } + }); + + udpServer = new ConnectionlessBootstrap(new NioDatagramChannelFactory( + Executors.newCachedThreadPool())); + + udpServer.setPipeline(Channels.pipeline(RpcUtil.STAGE_RPC_MESSAGE_PARSER, + handler, RpcUtil.STAGE_RPC_UDP_RESPONSE)); + + tcpChannel = tcpServer.bind(tcpAddress); + udpChannel = udpServer.bind(udpAddress); + allChannels.add(tcpChannel); + allChannels.add(udpChannel); + + LOG.info("Portmap server started at tcp://" + tcpChannel.getLocalAddress() + + ", udp://" + udpChannel.getLocalAddress()); + } } Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java?rev=1544672&r1=1544671&r2=1544672&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java Fri Nov 22 20:51:06 2013 @@ -17,9 +17,6 @@ */ package org.apache.hadoop.portmap; -import java.util.Arrays; -import java.util.Collection; - import org.apache.hadoop.oncrpc.RpcAcceptedReply; import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.VerifierNone; @@ -45,18 +42,13 @@ public class PortmapResponse { return xdr; } - public static XDR pmapList(XDR xdr, int xid, Collection<PortmapMapping> list) { + public static XDR pmapList(XDR xdr, int xid, PortmapMapping[] list) { RpcAcceptedReply.getAcceptInstance(xid, new VerifierNone()).write(xdr); for (PortmapMapping mapping : list) { - System.out.println(mapping); xdr.writeBoolean(true); // Value follows mapping.serialize(xdr); } xdr.writeBoolean(false); // No value follows return xdr; } - - public static XDR pmapList(XDR xdr, int xid, PortmapMapping[] list) { - return pmapList(xdr, xid, Arrays.asList(list)); - } } Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java?rev=1544672&r1=1544671&r2=1544672&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java Fri Nov 22 20:51:06 2013 @@ -18,8 +18,6 @@ package org.apache.hadoop.portmap; import java.util.HashMap; -import java.util.Map.Entry; -import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,31 +32,34 @@ import org.apache.hadoop.oncrpc.security import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.ChannelHandlerContext; - -/** - * An rpcbind request handler. - */ -public class RpcProgramPortmap extends RpcProgram implements PortmapInterface { - public static final int PROGRAM = 100000; - public static final int VERSION = 2; - +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.group.ChannelGroup; +import org.jboss.netty.handler.timeout.IdleState; +import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler; +import org.jboss.netty.handler.timeout.IdleStateEvent; + +final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler implements PortmapInterface { + static final int PROGRAM = 100000; + static final int VERSION = 2; private static final Log LOG = LogFactory.getLog(RpcProgramPortmap.class); /** Map synchronized usis monitor lock of this instance */ private final HashMap<String, PortmapMapping> map; - public RpcProgramPortmap() { - super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION); - map = new HashMap<String, PortmapMapping>(256); - } + /** ChannelGroup that remembers all active channels for gracefully shutdown. */ + private final ChannelGroup allChannels; - /** Dump all the register RPC services */ - private synchronized void dumpRpcServices() { - Set<Entry<String, PortmapMapping>> entrySet = map.entrySet(); - for (Entry<String, PortmapMapping> entry : entrySet) { - LOG.info("Service: " + entry.getKey() + " portmapping: " - + entry.getValue()); - } + RpcProgramPortmap(ChannelGroup allChannels) { + this.allChannels = allChannels; + map = new HashMap<String, PortmapMapping>(256); + PortmapMapping m = new PortmapMapping(PROGRAM, VERSION, + PortmapMapping.TRANSPORT_TCP, RpcProgram.RPCB_PORT); + PortmapMapping m1 = new PortmapMapping(PROGRAM, VERSION, + PortmapMapping.TRANSPORT_UDP, RpcProgram.RPCB_PORT); + map.put(PortmapMapping.key(m), m); + map.put(PortmapMapping.key(m1), m1); } @Override @@ -77,7 +78,6 @@ public class RpcProgramPortmap extends R PortmapMapping value = null; synchronized(this) { map.put(key, mapping); - dumpRpcServices(); value = map.get(key); } return PortmapResponse.intReply(out, xid, value.getPort()); @@ -126,21 +126,15 @@ public class RpcProgramPortmap extends R } @Override - public void register(PortmapMapping mapping) { - String key = PortmapMapping.key(mapping); - synchronized(this) { - map.put(key, mapping); - } - } + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + throws Exception { - @Override - public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) { + RpcInfo info = (RpcInfo) e.getMessage(); RpcCall rpcCall = (RpcCall) info.header(); final Procedure portmapProc = Procedure.fromValue(rpcCall.getProcedure()); int xid = rpcCall.getXid(); - byte[] data = new byte[info.data().readableBytes()]; - info.data().readBytes(data); - XDR in = new XDR(data); + XDR in = new XDR(info.data().toByteBuffer().asReadOnlyBuffer(), + XDR.State.READING); XDR out = new XDR(); if (portmapProc == Procedure.PMAPPROC_NULL) { @@ -162,13 +156,29 @@ public class RpcProgramPortmap extends R reply.write(out); } - ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer()); + ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap() + .buffer()); RpcResponse rsp = new RpcResponse(buf, info.remoteAddress()); RpcUtil.sendRpcResponse(ctx, rsp); } @Override - protected boolean isIdempotent(RpcCall call) { - return false; + public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) + throws Exception { + allChannels.add(e.getChannel()); + } + + @Override + public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) + throws Exception { + if (e.getState() == IdleState.ALL_IDLE) { + e.getChannel().close(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { + LOG.warn("Encountered ", e.getCause()); + e.getChannel().close(); } }