Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java?rev=1294028&r1=1294027&r2=1294028&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java Mon Feb 27 04:54:33 2012 @@ -34,7 +34,6 @@ public interface VersionedProtocol { * @return the version that the server will speak * @throws IOException if any IO error occurs */ - @Deprecated public long getProtocolVersion(String protocol, long clientVersion) throws IOException;
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1294028&r1=1294027&r2=1294028&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java Mon Feb 27 04:54:33 2012 @@ -18,23 +18,23 @@ package org.apache.hadoop.ipc; -import java.lang.reflect.Field; import java.lang.reflect.Proxy; import java.lang.reflect.Method; import java.lang.reflect.Array; -import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.net.InetSocketAddress; import java.io.*; -import java.util.Map; -import java.util.HashMap; import javax.net.SocketFactory; import org.apache.commons.logging.*; import org.apache.hadoop.io.*; +import org.apache.hadoop.ipc.Client.ConnectionId; +import org.apache.hadoop.ipc.RPC.RpcInvoker; +import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; +import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; @@ -49,8 +49,38 @@ public class WritableRpcEngine implement //writableRpcVersion should be updated if there is a change //in format of the rpc messages. - public static long writableRpcVersion = 1L; + + // 2L - added declared class to Invocation + public static final long writableRpcVersion = 2L; + + /** + * Whether or not this class has been initialized. + */ + private static boolean isInitialized = false; + + static { + ensureInitialized(); + } + + /** + * Initialize this class if it isn't already. + */ + public static synchronized void ensureInitialized() { + if (!isInitialized) { + initialize(); + } + } + + /** + * Register the rpcRequest deserializer for WritableRpcEngine + */ + private static synchronized void initialize() { + org.apache.hadoop.ipc.Server.registerProtocolEngine(RpcKind.RPC_WRITABLE, + Invocation.class, new Server.WritableRpcInvoker()); + isInitialized = true; + } + /** A method invocation, including the method name and its parameters.*/ private static class Invocation implements Writable, Configurable { private String methodName; @@ -59,11 +89,13 @@ public class WritableRpcEngine implement private Configuration conf; private long clientVersion; private int clientMethodsHash; + private String declaringClassProtocolName; //This could be different from static writableRpcVersion when received //at server, if client is using a different version. private long rpcVersion; + @SuppressWarnings("unused") // called when deserializing an invocation public Invocation() {} public Invocation(Method method, Object[] parameters) { @@ -76,18 +108,12 @@ public class WritableRpcEngine implement clientVersion = 0; clientMethodsHash = 0; } else { - try { - Field versionField = method.getDeclaringClass().getField("versionID"); - versionField.setAccessible(true); - this.clientVersion = versionField.getLong(method.getDeclaringClass()); - } catch (NoSuchFieldException ex) { - throw new RuntimeException(ex); - } catch (IllegalAccessException ex) { - throw new RuntimeException(ex); - } + this.clientVersion = RPC.getProtocolVersion(method.getDeclaringClass()); this.clientMethodsHash = ProtocolSignature.getFingerprint(method .getDeclaringClass().getMethods()); } + this.declaringClassProtocolName = + RPC.getProtocolName(method.getDeclaringClass()); } /** The name of the method invoked. */ @@ -103,6 +129,7 @@ public class WritableRpcEngine implement return clientVersion; } + @SuppressWarnings("unused") private int getClientMethodsHash() { return clientMethodsHash; } @@ -115,8 +142,10 @@ public class WritableRpcEngine implement return rpcVersion; } + @SuppressWarnings("deprecation") public void readFields(DataInput in) throws IOException { rpcVersion = in.readLong(); + declaringClassProtocolName = UTF8.readString(in); methodName = UTF8.readString(in); clientVersion = in.readLong(); clientMethodsHash = in.readInt(); @@ -124,13 +153,16 @@ public class WritableRpcEngine implement parameterClasses = new Class[parameters.length]; ObjectWritable objectWritable = new ObjectWritable(); for (int i = 0; i < parameters.length; i++) { - parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf); + parameters[i] = + ObjectWritable.readObject(in, objectWritable, this.conf); parameterClasses[i] = objectWritable.getDeclaredClass(); } } + @SuppressWarnings("deprecation") public void write(DataOutput out) throws IOException { out.writeLong(rpcVersion); + UTF8.writeString(out, declaringClassProtocolName); UTF8.writeString(out, methodName); out.writeLong(clientVersion); out.writeInt(clientMethodsHash); @@ -169,7 +201,7 @@ public class WritableRpcEngine implement private static ClientCache CLIENTS=new ClientCache(); - private static class Invoker implements InvocationHandler { + private static class Invoker implements RpcInvocationHandler { private Client.ConnectionId remoteId; private Client client; private boolean isClosed = false; @@ -191,7 +223,7 @@ public class WritableRpcEngine implement } ObjectWritable value = (ObjectWritable) - client.call(new Invocation(method, args), remoteId); + client.call(RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId); if (LOG.isDebugEnabled()) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); @@ -200,12 +232,17 @@ public class WritableRpcEngine implement } /* close the IPC client that's responsible for this invoker's RPCs */ - synchronized private void close() { + synchronized public void close() { if (!isClosed) { isClosed = true; CLIENTS.stopClient(client); } } + + @Override + public ConnectionId getConnectionId() { + return remoteId; + } } // for unit testing only @@ -231,15 +268,6 @@ public class WritableRpcEngine implement factory, rpcTimeout)); return new ProtocolProxy<T>(protocol, proxy, true); } - - /** - * Stop this proxy and release its invoker's resource - * @param proxy the proxy to be stopped - */ - public void stopProxy(Object proxy) { - ((Invoker)Proxy.getInvocationHandler(proxy)).close(); - } - /** Expert: Make multiple, parallel calls to a set of servers. */ public Object[] call(Method method, Object[][] params, @@ -273,134 +301,238 @@ public class WritableRpcEngine implement /** Construct a server for a protocol implementation instance listening on a * port and address. */ - public Server getServer(Class<?> protocol, - Object instance, String bindAddress, int port, - int numHandlers, int numReaders, int queueSizePerHandler, - boolean verbose, Configuration conf, + public RPC.Server getServer(Class<?> protocolClass, + Object protocolImpl, String bindAddress, int port, + int numHandlers, int numReaders, int queueSizePerHandler, + boolean verbose, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { - return new Server(instance, conf, bindAddress, port, numHandlers, - numReaders, queueSizePerHandler, verbose, secretManager); + return new Server(protocolClass, protocolImpl, conf, bindAddress, port, + numHandlers, numReaders, queueSizePerHandler, verbose, secretManager); } + /** An RPC Server. */ public static class Server extends RPC.Server { - private Object instance; - private boolean verbose; - - /** Construct an RPC server. + /** + * Construct an RPC server. * @param instance the instance whose methods will be called * @param conf the configuration to use * @param bindAddress the address to bind on to listen for connection * @param port the port to listen for connections on + * + * @deprecated Use #Server(Class, Object, Configuration, String, int) + */ + @Deprecated + public Server(Object instance, Configuration conf, String bindAddress, + int port) throws IOException { + this(null, instance, conf, bindAddress, port); + } + + + /** Construct an RPC server. + * @param protocolClass class + * @param protocolImpl the instance whose methods will be called + * @param conf the configuration to use + * @param bindAddress the address to bind on to listen for connection + * @param port the port to listen for connections on */ - public Server(Object instance, Configuration conf, String bindAddress, int port) + public Server(Class<?> protocolClass, Object protocolImpl, + Configuration conf, String bindAddress, int port) throws IOException { - this(instance, conf, bindAddress, port, 1, -1, -1, false, null); + this(protocolClass, protocolImpl, conf, bindAddress, port, 1, -1, -1, + false, null); } - private static String classNameBase(String className) { - String[] names = className.split("\\.", -1); - if (names == null || names.length == 0) { - return className; - } - return names[names.length-1]; + /** + * Construct an RPC server. + * @param protocolImpl the instance whose methods will be called + * @param conf the configuration to use + * @param bindAddress the address to bind on to listen for connection + * @param port the port to listen for connections on + * @param numHandlers the number of method handler threads to run + * @param verbose whether each call should be logged + * + * @deprecated use Server#Server(Class, Object, + * Configuration, String, int, int, int, int, boolean, SecretManager) + */ + @Deprecated + public Server(Object protocolImpl, Configuration conf, String bindAddress, + int port, int numHandlers, int numReaders, int queueSizePerHandler, + boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) + throws IOException { + this(null, protocolImpl, conf, bindAddress, port, + numHandlers, numReaders, queueSizePerHandler, verbose, + secretManager); + } - /** Construct an RPC server. - * @param instance the instance whose methods will be called + /** + * Construct an RPC server. + * @param protocolClass - the protocol being registered + * can be null for compatibility with old usage (see below for details) + * @param protocolImpl the protocol impl that will be called * @param conf the configuration to use * @param bindAddress the address to bind on to listen for connection * @param port the port to listen for connections on * @param numHandlers the number of method handler threads to run * @param verbose whether each call should be logged */ - public Server(Object instance, Configuration conf, String bindAddress, int port, - int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, - SecretManager<? extends TokenIdentifier> secretManager) + public Server(Class<?> protocolClass, Object protocolImpl, + Configuration conf, String bindAddress, int port, + int numHandlers, int numReaders, int queueSizePerHandler, + boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { - super(bindAddress, port, Invocation.class, numHandlers, numReaders, + super(bindAddress, port, null, numHandlers, numReaders, queueSizePerHandler, conf, - classNameBase(instance.getClass().getName()), secretManager); - this.instance = instance; + classNameBase(protocolImpl.getClass().getName()), secretManager); + this.verbose = verbose; - } + + + Class<?>[] protocols; + if (protocolClass == null) { // derive protocol from impl + /* + * In order to remain compatible with the old usage where a single + * target protocolImpl is suppled for all protocol interfaces, and + * the protocolImpl is derived from the protocolClass(es) + * we register all interfaces extended by the protocolImpl + */ + protocols = RPC.getProtocolInterfaces(protocolImpl.getClass()); - public Writable call(Class<?> protocol, Writable param, long receivedTime) - throws IOException { - try { - Invocation call = (Invocation)param; - if (verbose) log("Call: " + call); - - Method method = protocol.getMethod(call.getMethodName(), - call.getParameterClasses()); - method.setAccessible(true); - - // Verify rpc version - if (call.getRpcVersion() != writableRpcVersion) { - // Client is using a different version of WritableRpc - throw new IOException( - "WritableRpc version mismatch, client side version=" - + call.getRpcVersion() + ", server side version=" - + writableRpcVersion); + } else { + if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) { + throw new IOException("protocolClass "+ protocolClass + + " is not implemented by protocolImpl which is of class " + + protocolImpl.getClass()); } - - //Verify protocol version. - //Bypass the version check for VersionedProtocol - if (!method.getDeclaringClass().equals(VersionedProtocol.class)) { + // register protocol class and its super interfaces + registerProtocolAndImpl(RpcKind.RPC_WRITABLE, protocolClass, protocolImpl); + protocols = RPC.getProtocolInterfaces(protocolClass); + } + for (Class<?> p : protocols) { + if (!p.equals(VersionedProtocol.class)) { + registerProtocolAndImpl(RpcKind.RPC_WRITABLE, p, protocolImpl); + } + } + + } + + private static void log(String value) { + if (value!= null && value.length() > 55) + value = value.substring(0, 55)+"..."; + LOG.info(value); + } + + static class WritableRpcInvoker implements RpcInvoker { + + @Override + public Writable call(org.apache.hadoop.ipc.RPC.Server server, + String protocolName, Writable rpcRequest, long receivedTime) + throws IOException { + try { + Invocation call = (Invocation)rpcRequest; + if (server.verbose) log("Call: " + call); + + // Verify rpc version + if (call.getRpcVersion() != writableRpcVersion) { + // Client is using a different version of WritableRpc + throw new IOException( + "WritableRpc version mismatch, client side version=" + + call.getRpcVersion() + ", server side version=" + + writableRpcVersion); + } + long clientVersion = call.getProtocolVersion(); - ProtocolSignature serverInfo = ((VersionedProtocol) instance) - .getProtocolSignature(protocol.getCanonicalName(), call - .getProtocolVersion(), call.getClientMethodsHash()); - long serverVersion = serverInfo.getVersion(); - if (serverVersion != clientVersion) { - LOG.warn("Version mismatch: client version=" + clientVersion - + ", server version=" + serverVersion); - throw new RPC.VersionMismatch(protocol.getName(), clientVersion, - serverVersion); + final String protoName; + ProtoClassProtoImpl protocolImpl; + if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) { + // VersionProtocol methods are often used by client to figure out + // which version of protocol to use. + // + // Versioned protocol methods should go the protocolName protocol + // rather than the declaring class of the method since the + // the declaring class is VersionedProtocol which is not + // registered directly. + // Send the call to the highest protocol version + VerProtocolImpl highest = server.getHighestSupportedProtocol( + RpcKind.RPC_WRITABLE, protocolName); + if (highest == null) { + throw new IOException("Unknown protocol: " + protocolName); + } + protocolImpl = highest.protocolTarget; + } else { + protoName = call.declaringClassProtocolName; + + // Find the right impl for the protocol based on client version. + ProtoNameVer pv = + new ProtoNameVer(call.declaringClassProtocolName, clientVersion); + protocolImpl = + server.getProtocolImplMap(RpcKind.RPC_WRITABLE).get(pv); + if (protocolImpl == null) { // no match for Protocol AND Version + VerProtocolImpl highest = + server.getHighestSupportedProtocol(RpcKind.RPC_WRITABLE, + protoName); + if (highest == null) { + throw new IOException("Unknown protocol: " + protoName); + } else { // protocol supported but not the version that client wants + throw new RPC.VersionMismatch(protoName, clientVersion, + highest.version); + } + } } - } + - long startTime = System.currentTimeMillis(); - Object value = method.invoke(instance, call.getParameters()); - int processingTime = (int) (System.currentTimeMillis() - startTime); - int qTime = (int) (startTime-receivedTime); - if (LOG.isDebugEnabled()) { - LOG.debug("Served: " + call.getMethodName() + - " queueTime= " + qTime + - " procesingTime= " + processingTime); - } - rpcMetrics.addRpcQueueTime(qTime); - rpcMetrics.addRpcProcessingTime(processingTime); - rpcDetailedMetrics.addProcessingTime(call.getMethodName(), - processingTime); - if (verbose) log("Return: "+value); - - return new ObjectWritable(method.getReturnType(), value); - - } catch (InvocationTargetException e) { - Throwable target = e.getTargetException(); - if (target instanceof IOException) { - throw (IOException)target; - } else { - IOException ioe = new IOException(target.toString()); - ioe.setStackTrace(target.getStackTrace()); + // Invoke the protocol method + + long startTime = System.currentTimeMillis(); + Method method = + protocolImpl.protocolClass.getMethod(call.getMethodName(), + call.getParameterClasses()); + method.setAccessible(true); + server.rpcDetailedMetrics.init(protocolImpl.protocolClass); + Object value = + method.invoke(protocolImpl.protocolImpl, call.getParameters()); + int processingTime = (int) (System.currentTimeMillis() - startTime); + int qTime = (int) (startTime-receivedTime); + if (LOG.isDebugEnabled()) { + LOG.debug("Served: " + call.getMethodName() + + " queueTime= " + qTime + + " procesingTime= " + processingTime); + } + server.rpcMetrics.addRpcQueueTime(qTime); + server.rpcMetrics.addRpcProcessingTime(processingTime); + server.rpcDetailedMetrics.addProcessingTime(call.getMethodName(), + processingTime); + if (server.verbose) log("Return: "+value); + + return new ObjectWritable(method.getReturnType(), value); + + } catch (InvocationTargetException e) { + Throwable target = e.getTargetException(); + if (target instanceof IOException) { + throw (IOException)target; + } else { + IOException ioe = new IOException(target.toString()); + ioe.setStackTrace(target.getStackTrace()); + throw ioe; + } + } catch (Throwable e) { + if (!(e instanceof IOException)) { + LOG.error("Unexpected throwable object ", e); + } + IOException ioe = new IOException(e.toString()); + ioe.setStackTrace(e.getStackTrace()); throw ioe; } - } catch (Throwable e) { - if (!(e instanceof IOException)) { - LOG.error("Unexpected throwable object ", e); - } - IOException ioe = new IOException(e.toString()); - ioe.setStackTrace(e.getStackTrace()); - throw ioe; } } } - private static void log(String value) { - if (value!= null && value.length() > 55) - value = value.substring(0, 55)+"..."; - LOG.info(value); + @Override + public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy( + ConnectionId connId, Configuration conf, SocketFactory factory) + throws IOException { + throw new UnsupportedOperationException("This proxy is not supported"); } } Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java?rev=1294028&r1=1294027&r2=1294028&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java Mon Feb 27 04:54:33 2012 @@ -42,15 +42,20 @@ public class DelegationKey implements Wr @Nullable private byte[] keyBytes = null; + /** Default constructore required for Writable */ public DelegationKey() { - this(0, 0L, null); + this(0, 0L, (SecretKey)null); } public DelegationKey(int keyId, long expiryDate, SecretKey key) { + this(keyId, expiryDate, key != null ? key.getEncoded() : null); + } + + public DelegationKey(int keyId, long expiryDate, byte[] encodedKey) { this.keyId = keyId; this.expiryDate = expiryDate; - if (key!=null) { - this.keyBytes = key.getEncoded(); + if (encodedKey != null) { + this.keyBytes = encodedKey; } } @@ -70,6 +75,10 @@ public class DelegationKey implements Wr return key; } } + + public byte[] getEncodedKey() { + return keyBytes; + } public void setExpiryDate(long expiryDate) { this.expiryDate = expiryDate; Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tools/GetGroupsBase.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tools/GetGroupsBase.java?rev=1294028&r1=1294027&r2=1294028&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tools/GetGroupsBase.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tools/GetGroupsBase.java Mon Feb 27 04:54:33 2012 @@ -94,7 +94,7 @@ public abstract class GetGroupsBase exte * @return A {@link GetUserMappingsProtocol} client proxy. * @throws IOException */ - private GetUserMappingsProtocol getUgmProtocol() throws IOException { + protected GetUserMappingsProtocol getUgmProtocol() throws IOException { GetUserMappingsProtocol userGroupMappingProtocol = RPC.getProxy(GetUserMappingsProtocol.class, GetUserMappingsProtocol.versionID, Propchange: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/core/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Feb 27 04:54:33 2012 @@ -1,3 +1,4 @@ +/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/core:1227776-1294021 /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/core:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1166009,1166402,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1177487,1177531,1177859,1177864,1182189,1182205,1182214,1183132,1189613,1189932,1189982,1195575,1196113,1196129,1204114,1204117,1204122,1204124,1204129,1204131,1204177,1204370,1204376,1204388,1205260,1205697,1206786,1206830,1207694,1208153,1208313,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213954,1214046,1220510,1221348,1225114,1225192,1225456,1225489,1225591,1226211,1226239,1226350,1227091,1227165,1227423,1227964,1229347,1230398,1231569,1231572,1231627,1231640,1233605,1234555,1235135,1235137,1235956,1236456,1239752,1240897,1240928,1243065,1243104,1244766,1245751,1245762,1293419 /hadoop/core/branches/branch-0.19/core/src/test/core:713112 /hadoop/core/trunk/src/test/core:776175-785643,785929-786278 Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java?rev=1294028&r1=1294027&r2=1294028&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java Mon Feb 27 04:54:33 2012 @@ -57,6 +57,11 @@ public class TestFailoverProxy { public Class<?> getInterface() { return iface; } + + @Override + public void close() throws IOException { + // Nothing to do. + } } Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1294028&r1=1294027&r2=1294028&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Mon Feb 27 04:54:33 2012 @@ -23,6 +23,7 @@ import org.apache.commons.logging.*; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.net.NetUtils; @@ -96,8 +97,8 @@ public class TestIPC { } @Override - public Writable call(Class<?> protocol, Writable param, long receiveTime) - throws IOException { + public Writable call(RpcKind rpcKind, String protocol, Writable param, + long receiveTime) throws IOException { if (sleep) { // sleep a bit try { Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java?rev=1294028&r1=1294027&r2=1294028&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java Mon Feb 27 04:54:33 2012 @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.net.NetUtils; /** @@ -72,8 +73,8 @@ public class TestIPCServerResponder exte } @Override - public Writable call(Class<?> protocol, Writable param, long receiveTime) - throws IOException { + public Writable call(RpcKind rpcKind, String protocol, Writable param, + long receiveTime) throws IOException { if (sleep) { try { Thread.sleep(RANDOM.nextInt(20)); // sleep a bit Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java?rev=1294028&r1=1294027&r2=1294028&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java Mon Feb 27 04:54:33 2012 @@ -18,28 +18,39 @@ package org.apache.hadoop.ipc; +import java.io.Closeable; import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; +import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.util.Arrays; -import junit.framework.TestCase; +import javax.net.SocketFactory; import org.apache.commons.logging.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.UTF8; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.Service; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Test; +import static org.junit.Assert.*; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.DescriptorProtos.EnumDescriptorProto; @@ -49,18 +60,22 @@ import static org.apache.hadoop.test.Met import static org.mockito.Mockito.*; /** Unit tests for RPC. */ -public class TestRPC extends TestCase { +@SuppressWarnings("deprecation") +public class TestRPC { private static final String ADDRESS = "0.0.0.0"; public static final Log LOG = LogFactory.getLog(TestRPC.class); private static Configuration conf = new Configuration(); + + static { + conf.setClass("rpc.engine." + StoppedProtocol.class.getName(), + StoppedRpcEngine.class, RpcEngine.class); + } int datasize = 1024*100; int numThreads = 50; - - public TestRPC(String name) { super(name); } public interface TestProtocol extends VersionedProtocol { public static final long versionID = 1L; @@ -207,6 +222,80 @@ public class TestRPC extends TestCase { } } + /** + * A basic interface for testing client-side RPC resource cleanup. + */ + private static interface StoppedProtocol { + long versionID = 0; + + public void stop(); + } + + /** + * A class used for testing cleanup of client side RPC resources. + */ + private static class StoppedRpcEngine implements RpcEngine { + + @Override + public Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs, + UserGroupInformation ticket, Configuration conf) + throws IOException, InterruptedException { + return null; + } + + @SuppressWarnings("unchecked") + @Override + public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, + InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, + SocketFactory factory, int rpcTimeout) throws IOException { + T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), + new Class[] { protocol }, new StoppedInvocationHandler()); + return new ProtocolProxy<T>(protocol, proxy, false); + } + + @Override + public org.apache.hadoop.ipc.RPC.Server getServer(Class<?> protocol, + Object instance, String bindAddress, int port, int numHandlers, + int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, + SecretManager<? extends TokenIdentifier> secretManager) throws IOException { + return null; + } + + @Override + public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy( + ConnectionId connId, Configuration conf, SocketFactory factory) + throws IOException { + throw new UnsupportedOperationException("This proxy is not supported"); + } + } + + /** + * An invocation handler which does nothing when invoking methods, and just + * counts the number of times close() is called. + */ + private static class StoppedInvocationHandler + implements InvocationHandler, Closeable { + + private int closeCalled = 0; + + @Override + public Object invoke(Object proxy, Method method, Object[] args) + throws Throwable { + return null; + } + + @Override + public void close() throws IOException { + closeCalled++; + } + + public int getCloseCalled() { + return closeCalled; + } + + } + + @Test public void testConfRpc() throws Exception { Server server = RPC.getServer(TestProtocol.class, new TestImpl(), ADDRESS, 0, 1, false, conf, null); @@ -229,6 +318,7 @@ public class TestRPC extends TestCase { server.stop(); } + @Test public void testSlowRpc() throws Exception { System.out.println("Testing Slow RPC"); // create a server with two handlers @@ -273,11 +363,12 @@ public class TestRPC extends TestCase { } } - public void testRPCConf(Configuration conf) throws Exception { - + @Test + public void testCalls() throws Exception { + testCallsInternal(conf); } - - public void testCalls(Configuration conf) throws Exception { + + private void testCallsInternal(Configuration conf) throws Exception { Server server = RPC.getServer(TestProtocol.class, new TestImpl(), ADDRESS, 0, conf); TestProtocol proxy = null; @@ -384,6 +475,7 @@ public class TestRPC extends TestCase { } } + @Test public void testStandaloneClient() throws IOException { try { TestProtocol proxy = RPC.waitForProxy(TestProtocol.class, @@ -450,6 +542,7 @@ public class TestRPC extends TestCase { } } + @Test public void testAuthorization() throws Exception { Configuration conf = new Configuration(); conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, @@ -481,20 +574,48 @@ public class TestRPC extends TestCase { Configuration conf = new Configuration(); conf.setBoolean("ipc.client.ping", false); - new TestRPC("testnoPings").testCalls(conf); + new TestRPC().testCallsInternal(conf); conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2); - new TestRPC("testnoPings").testCalls(conf); + new TestRPC().testCallsInternal(conf); } /** * Test stopping a non-registered proxy * @throws Exception */ + @Test public void testStopNonRegisteredProxy() throws Exception { RPC.stopProxy(mock(TestProtocol.class)); } + @Test + public void testStopProxy() throws IOException { + StoppedProtocol proxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class, + StoppedProtocol.versionID, null, conf); + StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler) + Proxy.getInvocationHandler(proxy); + assertEquals(invocationHandler.getCloseCalled(), 0); + RPC.stopProxy(proxy); + assertEquals(invocationHandler.getCloseCalled(), 1); + } + + @Test + public void testWrappedStopProxy() throws IOException { + StoppedProtocol wrappedProxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class, + StoppedProtocol.versionID, null, conf); + StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler) + Proxy.getInvocationHandler(wrappedProxy); + + StoppedProtocol proxy = (StoppedProtocol) RetryProxy.create(StoppedProtocol.class, + wrappedProxy, RetryPolicies.RETRY_FOREVER); + + assertEquals(invocationHandler.getCloseCalled(), 0); + RPC.stopProxy(proxy); + assertEquals(invocationHandler.getCloseCalled(), 1); + } + + @Test public void testErrorMsgForInsecureClient() throws Exception { final Server server = RPC.getServer(TestProtocol.class, new TestImpl(), ADDRESS, 0, 5, true, conf, null); @@ -567,10 +688,10 @@ public class TestRPC extends TestCase { return count; } - /** * Test that server.stop() properly stops all threads */ + @Test public void testStopsAllThreads() throws Exception { int threadsBefore = countThreads("Server$Listener$Reader"); assertEquals("Expect no Reader threads running before test", @@ -591,8 +712,7 @@ public class TestRPC extends TestCase { } public static void main(String[] args) throws Exception { - - new TestRPC("test").testCalls(conf); + new TestRPC().testCallsInternal(conf); } } Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java?rev=1294028&r1=1294027&r2=1294028&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java Mon Feb 27 04:54:33 2012 @@ -31,6 +31,10 @@ import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto; import org.apache.hadoop.net.NetUtils; import org.junit.After; import org.junit.Test; @@ -39,7 +43,7 @@ import org.junit.Test; public class TestRPCCompatibility { private static final String ADDRESS = "0.0.0.0"; private static InetSocketAddress addr; - private static Server server; + private static RPC.Server server; private ProtocolProxy<?> proxy; public static final Log LOG = @@ -52,10 +56,14 @@ public class TestRPCCompatibility { void ping() throws IOException; } - public interface TestProtocol1 extends TestProtocol0 { + public interface TestProtocol1 extends VersionedProtocol, TestProtocol0 { String echo(String value) throws IOException; } + + // TestProtocol2 is a compatible impl of TestProtocol1 - hence use its name + @ProtocolInfo(protocolName= + "org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1") public interface TestProtocol2 extends TestProtocol1 { int echo(int value) throws IOException; } @@ -89,28 +97,44 @@ public class TestRPCCompatibility { public static class TestImpl1 extends TestImpl0 implements TestProtocol1 { @Override public String echo(String value) { return value; } + @Override + public long getProtocolVersion(String protocol, + long clientVersion) throws IOException { + return TestProtocol1.versionID; + } } public static class TestImpl2 extends TestImpl1 implements TestProtocol2 { @Override public int echo(int value) { return value; } + + @Override + public long getProtocolVersion(String protocol, + long clientVersion) throws IOException { + return TestProtocol2.versionID; + } + } @After public void tearDown() throws IOException { if (proxy != null) { RPC.stopProxy(proxy.getProxy()); + proxy = null; } if (server != null) { server.stop(); + server = null; } } @Test // old client vs new server public void testVersion0ClientVersion1Server() throws Exception { // create a server with two handlers + TestImpl1 impl = new TestImpl1(); server = RPC.getServer(TestProtocol1.class, - new TestImpl1(), ADDRESS, 0, 2, false, conf, null); + impl, ADDRESS, 0, 2, false, conf, null); + server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.start(); addr = NetUtils.getConnectAddress(server); @@ -154,8 +178,10 @@ public class TestRPCCompatibility { public int echo(int value) throws IOException, NumberFormatException { if (serverInfo.isMethodSupported("echo", int.class)) { +System.out.println("echo int is supported"); return -value; // use version 3 echo long } else { // server is version 2 +System.out.println("echo int is NOT supported"); return Integer.parseInt(proxy2.echo(String.valueOf(value))); } } @@ -172,8 +198,10 @@ public class TestRPCCompatibility { @Test // Compatible new client & old server public void testVersion2ClientVersion1Server() throws Exception { // create a server with two handlers + TestImpl1 impl = new TestImpl1(); server = RPC.getServer(TestProtocol1.class, - new TestImpl1(), ADDRESS, 0, 2, false, conf, null); + impl, ADDRESS, 0, 2, false, conf, null); + server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.start(); addr = NetUtils.getConnectAddress(server); @@ -189,9 +217,12 @@ public class TestRPCCompatibility { @Test // equal version client and server public void testVersion2ClientVersion2Server() throws Exception { + ProtocolSignature.resetCache(); // create a server with two handlers + TestImpl2 impl = new TestImpl2(); server = RPC.getServer(TestProtocol2.class, - new TestImpl2(), ADDRESS, 0, 2, false, conf, null); + impl, ADDRESS, 0, 2, false, conf, null); + server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.start(); addr = NetUtils.getConnectAddress(server); @@ -250,14 +281,16 @@ public class TestRPCCompatibility { assertEquals(hash1, hash2); } + @ProtocolInfo(protocolName= + "org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1") public interface TestProtocol4 extends TestProtocol2 { - public static final long versionID = 1L; + public static final long versionID = 4L; int echo(int value) throws IOException; } @Test public void testVersionMismatch() throws IOException { - server = RPC.getServer(TestProtocol2.class, new TestImpl0(), ADDRESS, 0, 2, + server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2, false, conf, null); server.start(); addr = NetUtils.getConnectAddress(server); @@ -268,7 +301,76 @@ public class TestRPCCompatibility { proxy.echo(21); fail("The call must throw VersionMismatch exception"); } catch (IOException ex) { - Assert.assertTrue(ex.getMessage().contains("VersionMismatch")); + Assert.assertTrue("Expected version mismatch but got " + ex.getMessage(), + ex.getMessage().contains("VersionMismatch")); + } + } + + @Test + public void testIsMethodSupported() throws IOException { + server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2, + false, conf, null); + server.start(); + addr = NetUtils.getConnectAddress(server); + + TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class, + TestProtocol2.versionID, addr, conf); + boolean supported = RpcClientUtil.isMethodSupported(proxy, + TestProtocol2.class, RpcKind.RPC_WRITABLE, + RPC.getProtocolVersion(TestProtocol2.class), "echo"); + Assert.assertTrue(supported); + supported = RpcClientUtil.isMethodSupported(proxy, + TestProtocol2.class, RpcKind.RPC_PROTOCOL_BUFFER, + RPC.getProtocolVersion(TestProtocol2.class), "echo"); + Assert.assertFalse(supported); + } + + /** + * Verify that ProtocolMetaInfoServerSideTranslatorPB correctly looks up + * the server registry to extract protocol signatures and versions. + */ + @Test + public void testProtocolMetaInfoSSTranslatorPB() throws Exception { + TestImpl1 impl = new TestImpl1(); + server = RPC.getServer(TestProtocol1.class, impl, ADDRESS, 0, 2, false, + conf, null); + server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); + server.start(); + + ProtocolMetaInfoServerSideTranslatorPB xlator = + new ProtocolMetaInfoServerSideTranslatorPB(server); + + GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature( + null, + createGetProtocolSigRequestProto(TestProtocol1.class, + RpcKind.RPC_PROTOCOL_BUFFER)); + //No signatures should be found + Assert.assertEquals(0, resp.getProtocolSignatureCount()); + resp = xlator.getProtocolSignature( + null, + createGetProtocolSigRequestProto(TestProtocol1.class, + RpcKind.RPC_WRITABLE)); + Assert.assertEquals(1, resp.getProtocolSignatureCount()); + ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0); + Assert.assertEquals(TestProtocol1.versionID, sig.getVersion()); + boolean found = false; + int expected = ProtocolSignature.getFingerprint(TestProtocol1.class + .getMethod("echo", String.class)); + for (int m : sig.getMethodsList()) { + if (expected == m) { + found = true; + break; + } } + Assert.assertTrue(found); + } + + private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto( + Class<?> protocol, RpcKind rpcKind) { + GetProtocolSignatureRequestProto.Builder builder = + GetProtocolSignatureRequestProto.newBuilder(); + builder.setProtocol(protocol.getName()); + builder.setRpcKind(rpcKind.toString()); + return builder.build(); } } \ No newline at end of file Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java?rev=1294028&r1=1294027&r2=1294028&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java Mon Feb 27 04:54:33 2012 @@ -164,6 +164,10 @@ public abstract class MultithreadedTestU } checkException(); } + + public Iterable<? extends Thread> getTestThreads() { + return testThreads; + } } /**
