Author: brandonli Date: Mon Sep 30 19:21:17 2013 New Revision: 1527726 URL: http://svn.apache.org/r1527726 Log: HDFS-5230. Introduce RpcInfo to decouple XDR classes from the RPC API. Contributed by Haohui Mai
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java?rev=1527726&r1=1527725&r2=1527726&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java Mon Sep 30 19:21:17 2013 @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.nfs.mount import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -38,10 +39,15 @@ import org.apache.hadoop.nfs.nfs3.FileHa import org.apache.hadoop.nfs.nfs3.Nfs3Status; import org.apache.hadoop.oncrpc.RpcAcceptedReply; import org.apache.hadoop.oncrpc.RpcCall; +import org.apache.hadoop.oncrpc.RpcInfo; import org.apache.hadoop.oncrpc.RpcProgram; +import org.apache.hadoop.oncrpc.RpcResponse; +import org.apache.hadoop.oncrpc.RpcUtil; import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.VerifierNone; -import org.jboss.netty.channel.Channel; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.ChannelHandlerContext; /** * RPC program corresponding to mountd daemon. See {@link Mountd}. @@ -77,7 +83,7 @@ public class RpcProgramMountd extends Rp throws IOException { // Note that RPC cache is not enabled super("mountd", "localhost", config.getInt("nfs3.mountd.port", PORT), - PROGRAM, VERSION_1, VERSION_3, 0); + PROGRAM, VERSION_1, VERSION_3); this.hostsMatcher = NfsExports.getInstance(config); this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>()); @@ -173,10 +179,16 @@ public class RpcProgramMountd extends Rp } @Override - public XDR handleInternal(RpcCall rpcCall, XDR xdr, XDR out, - InetAddress client, Channel channel) { + public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) { + RpcCall rpcCall = (RpcCall) info.header(); final MNTPROC mntproc = MNTPROC.fromValue(rpcCall.getProcedure()); int xid = rpcCall.getXid(); + byte[] data = new byte[info.data().readableBytes()]; + info.data().readBytes(data); + XDR xdr = new XDR(data); + XDR out = new XDR(); + InetAddress client = ((InetSocketAddress) info.remoteAddress()).getAddress(); + if (mntproc == MNTPROC.NULL) { out = nullOp(out, xid, client); } else if (mntproc == MNTPROC.MNT) { @@ -198,7 +210,9 @@ public class RpcProgramMountd extends Rp RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write( out); } - return out; + ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer()); + RpcResponse rsp = new RpcResponse(buf, info.remoteAddress()); + RpcUtil.sendRpcResponse(ctx, rsp); } @Override Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1527726&r1=1527725&r2=1527726&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Mon Sep 30 19:21:17 2013 @@ -21,6 +21,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.EnumSet; @@ -103,9 +104,13 @@ import org.apache.hadoop.nfs.nfs3.respon import org.apache.hadoop.nfs.nfs3.response.WccData; import org.apache.hadoop.oncrpc.RpcAcceptedReply; import org.apache.hadoop.oncrpc.RpcCall; +import org.apache.hadoop.oncrpc.RpcCallCache; import org.apache.hadoop.oncrpc.RpcDeniedReply; +import org.apache.hadoop.oncrpc.RpcInfo; import org.apache.hadoop.oncrpc.RpcProgram; import org.apache.hadoop.oncrpc.RpcReply; +import org.apache.hadoop.oncrpc.RpcResponse; +import org.apache.hadoop.oncrpc.RpcUtil; import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.Credentials; import org.apache.hadoop.oncrpc.security.CredentialsSys; @@ -115,7 +120,10 @@ import org.apache.hadoop.oncrpc.security import org.apache.hadoop.oncrpc.security.Verifier; import org.apache.hadoop.oncrpc.security.VerifierNone; import org.apache.hadoop.security.AccessControlException; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; /** * RPC program corresponding to nfs daemon. See {@link Nfs3}. @@ -150,14 +158,15 @@ public class RpcProgramNfs3 extends RpcP private Statistics statistics; private String writeDumpDir; // The dir save dump files + private final RpcCallCache rpcCallCache; + public RpcProgramNfs3() throws IOException { this(new Configuration()); } - public RpcProgramNfs3(Configuration config) - throws IOException { + public RpcProgramNfs3(Configuration config) throws IOException { super("NFS3", "localhost", Nfs3Constant.PORT, Nfs3Constant.PROGRAM, - Nfs3Constant.VERSION, Nfs3Constant.VERSION, 100); + Nfs3Constant.VERSION, Nfs3Constant.VERSION); config.set(FsPermission.UMASK_LABEL, "000"); iug = new IdUserGroup(); @@ -183,6 +192,8 @@ public class RpcProgramNfs3 extends RpcP } else { clearDirectory(writeDumpDir); } + + rpcCallCache = new RpcCallCache("NFS3", 256); } private void clearDirectory(String writeDumpDir) throws IOException { @@ -213,8 +224,8 @@ public class RpcProgramNfs3 extends RpcP } @Override - public GETATTR3Response getattr(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { @@ -294,8 +305,8 @@ public class RpcProgramNfs3 extends RpcP } @Override - public SETATTR3Response setattr(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK); DFSClient dfsClient = clientCache.get(securityHandler.getUser()); if (dfsClient == null) { @@ -370,8 +381,8 @@ public class RpcProgramNfs3 extends RpcP } @Override - public LOOKUP3Response lookup(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { @@ -432,8 +443,8 @@ public class RpcProgramNfs3 extends RpcP } @Override - public ACCESS3Response access(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public ACCESS3Response access(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { @@ -574,7 +585,6 @@ public class RpcProgramNfs3 extends RpcP long offset = request.getOffset(); int count = request.getCount(); - FileHandle handle = request.getHandle(); if (LOG.isDebugEnabled()) { LOG.debug("NFS READ fileId: " + handle.getFileId() + " offset: " + offset @@ -720,8 +730,8 @@ public class RpcProgramNfs3 extends RpcP } @Override - public CREATE3Response create(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public CREATE3Response create(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK); DFSClient dfsClient = clientCache.get(securityHandler.getUser()); if (dfsClient == null) { @@ -973,8 +983,7 @@ public class RpcProgramNfs3 extends RpcP } String fileIdPath = dirFileIdPath + "/" + fileName; - HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, - fileIdPath); + HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath); if (fstat == null) { WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr), preOpDirAttr); @@ -1056,8 +1065,7 @@ public class RpcProgramNfs3 extends RpcP } String fileIdPath = dirFileIdPath + "/" + fileName; - HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, - fileIdPath); + HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath); if (fstat == null) { return new RMDIR3Response(Nfs3Status.NFS3ERR_NOENT, errWcc); } @@ -1098,8 +1106,8 @@ public class RpcProgramNfs3 extends RpcP } @Override - public RENAME3Response rename(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK); DFSClient dfsClient = clientCache.get(securityHandler.getUser()); if (dfsClient == null) { @@ -1245,13 +1253,14 @@ public class RpcProgramNfs3 extends RpcP } } - public READDIR3Response link(XDR xdr, SecurityHandler securityHandler, InetAddress client) { + public READDIR3Response link(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP); } @Override - public READDIR3Response readdir(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { @@ -1540,8 +1549,8 @@ public class RpcProgramNfs3 extends RpcP } @Override - public FSSTAT3Response fsstat(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { @@ -1598,8 +1607,8 @@ public class RpcProgramNfs3 extends RpcP } @Override - public FSINFO3Response fsinfo(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { @@ -1650,8 +1659,8 @@ public class RpcProgramNfs3 extends RpcP } @Override - public PATHCONF3Response pathconf(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { @@ -1697,8 +1706,8 @@ public class RpcProgramNfs3 extends RpcP } @Override - public COMMIT3Response commit(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public COMMIT3Response commit(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK); DFSClient dfsClient = clientCache.get(securityHandler.getUser()); if (dfsClient == null) { @@ -1776,25 +1785,53 @@ public class RpcProgramNfs3 extends RpcP } @Override - public XDR handleInternal(RpcCall rpcCall, final XDR xdr, XDR out, - InetAddress client, Channel channel) { + public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) { + RpcCall rpcCall = (RpcCall) info.header(); final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure()); int xid = rpcCall.getXid(); + byte[] data = new byte[info.data().readableBytes()]; + info.data().readBytes(data); + XDR xdr = new XDR(data); + XDR out = new XDR(); + InetAddress client = ((InetSocketAddress) info.remoteAddress()) + .getAddress(); + Channel channel = info.channel(); Credentials credentials = rpcCall.getCredential(); // Ignore auth only for NFSPROC3_NULL, especially for Linux clients. if (nfsproc3 != NFSPROC3.NULL) { - if (rpcCall.getCredential().getFlavor() != AuthFlavor.AUTH_SYS - && rpcCall.getCredential().getFlavor() != AuthFlavor.RPCSEC_GSS) { - LOG.info("Wrong RPC AUTH flavor, " - + rpcCall.getCredential().getFlavor() + if (credentials.getFlavor() != AuthFlavor.AUTH_SYS + && credentials.getFlavor() != AuthFlavor.RPCSEC_GSS) { + LOG.info("Wrong RPC AUTH flavor, " + credentials.getFlavor() + " is not AUTH_SYS or RPCSEC_GSS."); XDR reply = new XDR(); RpcDeniedReply rdr = new RpcDeniedReply(xid, RpcReply.ReplyState.MSG_ACCEPTED, RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone()); rdr.write(reply); - return reply; + + ChannelBuffer buf = ChannelBuffers.wrappedBuffer(reply.asReadOnlyWrap() + .buffer()); + RpcResponse rsp = new RpcResponse(buf, info.remoteAddress()); + RpcUtil.sendRpcResponse(ctx, rsp); + return; + } + } + + if (!isIdempotent(rpcCall)) { + RpcCallCache.CacheEntry entry = rpcCallCache.checkOrAddToCache(client, + xid); + if (entry != null) { // in cache + if (entry.isCompleted()) { + LOG.info("Sending the cached reply to retransmitted request " + xid); + RpcUtil.sendRpcResponse(ctx, entry.getResponse()); + return; + } else { // else request is in progress + LOG.info("Retransmitted request, transaction still in progress " + + xid); + // Ignore the request and do nothing + return; + } } } @@ -1862,12 +1899,24 @@ public class RpcProgramNfs3 extends RpcP RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write( out); } - if (response != null) { - // TODO: currently we just return VerifierNone - out = response.writeHeaderAndResponse(out, xid, new VerifierNone()); + if (response == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("No sync response, expect an async response for request XID=" + + rpcCall.getXid()); + } + return; + } + // TODO: currently we just return VerifierNone + out = response.writeHeaderAndResponse(out, xid, new VerifierNone()); + ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap() + .buffer()); + RpcResponse rsp = new RpcResponse(buf, info.remoteAddress()); + + if (!isIdempotent(rpcCall)) { + rpcCallCache.callCompleted(client, xid, rsp); } - return out; + RpcUtil.sendRpcResponse(ctx, rsp); } @Override Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1527726&r1=1527725&r2=1527726&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Sep 30 19:21:17 2013 @@ -345,6 +345,9 @@ Release 2.1.2 - UNRELEASED NEW FEATURES + HDFS-5230. Introduce RpcInfo to decouple XDR classes from the RPC API. + (Haohui Mai via brandonli) + IMPROVEMENTS HDFS-5246. Make Hadoop nfs server port and mount daemon port