Author: hairong
Date: Thu Nov 25 04:36:17 2010
New Revision: 1038918
URL: http://svn.apache.org/viewvc?rev=1038918&view=rev
Log:
HADOOP-6764. Add number of reader threads and queue length as configuration
parameters in RPC.getServer. Contributed by Dmytro Molkov.
Modified:
hadoop/common/trunk/CHANGES.txt
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java
Modified: hadoop/common/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=1038918&r1=1038917&r2=1038918&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Thu Nov 25 04:36:17 2010
@@ -15,7 +15,10 @@ Trunk (unreleased changes)
improve other messaging. (nigel)
HADOOP-7001. Configuration changes can occur via the Reconfigurable
- interface. (Patrick Kline via dhruba)
+ interface. (Patrick Kling via dhruba)
+
+ HADOOP-6764. Add number of reader threads and queue length as
+ configuration parameters in RPC.getServer. (Dmytro Molkov via hairong)
OPTIMIZATIONS
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=1038918&r1=1038917&r2=1038918&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
(original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java Thu
Nov 25 04:36:17 2010
@@ -211,14 +211,15 @@ public class AvroRpcEngine implements Rp
/** Construct a server for a protocol implementation instance listening on a
* port and address. */
public RPC.Server getServer(Class<?> iface, Object impl, String bindAddress,
- int port, int numHandlers, boolean verbose,
+ int port, int numHandlers, int numReaders,
+ int queueSizePerHandler, boolean verbose,
Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager
) throws IOException {
return ENGINE.getServer(TunnelProtocol.class,
new TunnelResponder(iface, impl),
- bindAddress, port, numHandlers, verbose, conf,
- secretManager);
+ bindAddress, port, numHandlers, numReaders,
+ queueSizePerHandler, verbose, conf, secretManager);
}
}
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=1038918&r1=1038917&r2=1038918&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java Thu Nov 25
04:36:17 2010
@@ -380,18 +380,33 @@ public class RPC {
throws IOException {
return getProtocolEngine(protocol, conf)
- .getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
- conf, secretManager);
+ .getServer(protocol, instance, bindAddress, port, numHandlers, -1, -1,
+ verbose, conf, secretManager);
+ }
+
+ /** Construct a server for a protocol implementation instance. */
+ public static Server getServer(Class<?> protocol,
+ Object instance, String bindAddress, int port,
+ int numHandlers, int numReaders, int
queueSizePerHandler,
+ boolean verbose, Configuration conf,
+ SecretManager<? extends TokenIdentifier>
secretManager)
+ throws IOException {
+
+ return getProtocolEngine(protocol, conf)
+ .getServer(protocol, instance, bindAddress, port, numHandlers,
+ numReaders, queueSizePerHandler, verbose, conf,
secretManager);
}
/** An RPC Server. */
public abstract static class Server extends org.apache.hadoop.ipc.Server {
protected Server(String bindAddress, int port,
- Class<? extends Writable> paramClass, int handlerCount,
+ Class<? extends Writable> paramClass, int handlerCount,
+ int numReaders, int queueSizePerHandler,
Configuration conf, String serverName,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
- super(bindAddress, port, paramClass, handlerCount, conf, serverName,
secretManager);
+ super(bindAddress, port, paramClass, handlerCount, numReaders,
queueSizePerHandler,
+ conf, serverName, secretManager);
}
}
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java?rev=1038918&r1=1038917&r2=1038918&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java Thu Nov
25 04:36:17 2010
@@ -50,7 +50,8 @@ public interface RpcEngine {
/** Construct a server for a protocol implementation instance. */
RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress,
- int port, int numHandlers, boolean verbose,
+ int port, int numHandlers, int numReaders,
+ int queueSizePerHandler, boolean verbose,
Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager
) throws IOException;
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=1038918&r1=1038917&r2=1038918&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java Thu Nov 25
04:36:17 2010
@@ -1451,16 +1451,18 @@ public abstract class Server {
Configuration conf)
throws IOException
{
- this(bindAddress, port, paramClass, handlerCount, conf,
Integer.toString(port), null);
+ this(bindAddress, port, paramClass, handlerCount, -1, -1, conf,
Integer.toString(port), null);
}
+
/** Constructs a server listening on the named port and address. Parameters
passed must
* be of the named class. The <code>handlerCount</handlerCount> determines
* the number of handler threads that will be used to process calls.
- *
+ * If queueSizePerHandler or numReaders are not -1 they will be used instead
of parameters
+ * from configuration. Otherwise the configuration will be picked up.
*/
@SuppressWarnings("unchecked")
protected Server(String bindAddress, int port,
- Class<? extends Writable> paramClass, int handlerCount,
+ Class<? extends Writable> paramClass, int handlerCount, int
numReaders, int queueSizePerHandler,
Configuration conf, String serverName, SecretManager<?
extends TokenIdentifier> secretManager)
throws IOException {
this.bindAddress = bindAddress;
@@ -1469,15 +1471,23 @@ public abstract class Server {
this.paramClass = paramClass;
this.handlerCount = handlerCount;
this.socketSendBufferSize = 0;
- this.maxQueueSize = handlerCount * conf.getInt(
- CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
- CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
+ if (queueSizePerHandler != -1) {
+ this.maxQueueSize = queueSizePerHandler;
+ } else {
+ this.maxQueueSize = handlerCount * conf.getInt(
+ CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
+ CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
+ }
this.maxRespSize = conf.getInt(
CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
- this.readThreads = conf.getInt(
- CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
- CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
+ if (numReaders != -1) {
+ this.readThreads = numReaders;
+ } else {
+ this.readThreads = conf.getInt(
+ CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
+ CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
+ }
this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime",
1000);
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
@@ -1691,7 +1701,22 @@ public abstract class Server {
return callQueue.size();
}
-
+ /**
+ * The maximum size of the rpc call queue of this server.
+ * @return The maximum size of the rpc call queue.
+ */
+ public int getMaxQueueSize() {
+ return maxQueueSize;
+ }
+
+ /**
+ * The number of reader threads for this server.
+ * @return The number of reader threads.
+ */
+ public int getNumReaders() {
+ return readThreads;
+ }
+
/**
* When the read or write buffer size is larger than this limit, i/o will be
* done in chunks of this size. Most RPC requests and responses would be
Modified:
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1038918&r1=1038917&r2=1038918&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
(original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
Thu Nov 25 04:36:17 2010
@@ -285,11 +285,12 @@ public class WritableRpcEngine implement
* port and address. */
public Server getServer(Class<?> protocol,
Object instance, String bindAddress, int port,
- int numHandlers, boolean verbose, Configuration conf,
+ int numHandlers, int numReaders, int
queueSizePerHandler,
+ boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
return new Server(instance, conf, bindAddress, port, numHandlers,
- verbose, secretManager);
+ numReaders, queueSizePerHandler, verbose, secretManager);
}
/** An RPC Server. */
@@ -305,7 +306,7 @@ public class WritableRpcEngine implement
*/
public Server(Object instance, Configuration conf, String bindAddress, int
port)
throws IOException {
- this(instance, conf, bindAddress, port, 1, false, null);
+ this(instance, conf, bindAddress, port, 1, -1, -1, false, null);
}
private static String classNameBase(String className) {
@@ -325,10 +326,11 @@ public class WritableRpcEngine implement
* @param verbose whether each call should be logged
*/
public Server(Object instance, Configuration conf, String bindAddress,
int port,
- int numHandlers, boolean verbose,
+ int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
- super(bindAddress, port, Invocation.class, numHandlers, conf,
+ super(bindAddress, port, Invocation.class, numHandlers, numReaders,
+ queueSizePerHandler, conf,
classNameBase(instance.getClass().getName()), secretManager);
this.instance = instance;
this.verbose = verbose;
Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java?rev=1038918&r1=1038917&r2=1038918&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java
(original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java Thu
Nov 25 04:36:17 2010
@@ -190,6 +190,28 @@ public class TestRPC extends TestCase {
}
}
}
+
+ public void testConfRpc() throws Exception {
+ Server server = RPC.getServer(TestProtocol.class,
+ new TestImpl(), ADDRESS, 0, 1, false, conf,
null);
+ // Just one handler
+ int confQ = conf.getInt(
+ CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
+ CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
+ assertEquals(confQ, server.getMaxQueueSize());
+
+ int confReaders = conf.getInt(
+ CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
+ CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
+ assertEquals(confReaders, server.getNumReaders());
+ server.stop();
+
+ server = RPC.getServer(TestProtocol.class,
+ new TestImpl(), ADDRESS, 0, 1, 3, 200,
false, conf, null);
+ assertEquals(3, server.getNumReaders());
+ assertEquals(200, server.getMaxQueueSize());
+ server.stop();
+ }
public void testSlowRpc() throws Exception {
System.out.println("Testing Slow RPC");
@@ -234,6 +256,10 @@ public class TestRPC extends TestCase {
System.out.println("Down slow rpc testing");
}
}
+
+ public void testRPCConf(Configuration conf) throws Exception {
+
+ }
public void testCalls(Configuration conf) throws Exception {
Server server = RPC.getServer(TestProtocol.class,