Author: hairong
Date: Mon May 5 14:24:25 2008
New Revision: 653607
URL: http://svn.apache.org/viewvc?rev=653607&view=rev
Log:
HADOOP-2188. RPC should send a ping rather than use client timeouts.
Contributed by Hairong Kuang.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/conf/hadoop-default.xml
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java
hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMgt.java
hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMgtMBean.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java
hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPCServerResponder.java
hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon May 5 14:24:25 2008
@@ -31,6 +31,12 @@
xxxID.toString() and xxxID.forName() methods to convert/restore objects
to/from strings. (Enis Soztutar via ddas)
+ HADOOP-2188. RPC client sends a ping rather than throw timeouts.
+ RPC server does not throw away old RPCs. If clients and the server are on
+ different versions, they are not able to function well. In addition,
+ The property ipc.client.timeout is removed from the default hadoop
+ configuration. It also removes metrics RpcOpsDiscardedOPsNum. (hairong)
+
NEW FEATURES
HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS,
Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Mon May 5 14:24:25 2008
@@ -1060,12 +1060,6 @@
<!-- ipc properties -->
<property>
- <name>ipc.client.timeout</name>
- <value>60000</value>
- <description>Defines the timeout for IPC calls in milliseconds.</description>
-</property>
-
-<property>
<name>ipc.client.idlethreshold</name>
<value>4000</value>
<description>Defines the threshold number of connections after which
@@ -1076,7 +1070,7 @@
<property>
<name>ipc.client.maxidletime</name>
<value>120000</value>
- <description>Defines the maximum idle time for a connected client after
+ <description>Defines the maximum idle time in msec for a connected client
after
which it may be disconnected.
</description>
</property>
@@ -1090,8 +1084,8 @@
<property>
<name>ipc.client.connection.maxidletime</name>
- <value>1000</value>
- <description>The maximum time after which a client will bring down the
+ <value>10000</value>
+ <description>The maximum time in msec after which a client will bring down
the
connection to the server.
</description>
</property>
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon May 5
14:24:25 2008
@@ -106,8 +106,6 @@
private static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
throws IOException {
- RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
- 5, 200, TimeUnit.MILLISECONDS);
RetryPolicy createPolicy =
RetryPolicies.retryUpToMaximumCountWithFixedSleep(
5, LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
@@ -120,27 +118,10 @@
exceptionToPolicyMap.put(RemoteException.class,
RetryPolicies.retryByRemoteException(
RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
- exceptionToPolicyMap.put(SocketTimeoutException.class, timeoutPolicy);
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String,RetryPolicy> methodNameToPolicyMap = new
HashMap<String,RetryPolicy>();
- methodNameToPolicyMap.put("open", methodPolicy);
- methodNameToPolicyMap.put("setReplication", methodPolicy);
- methodNameToPolicyMap.put("abandonBlock", methodPolicy);
- methodNameToPolicyMap.put("abandonFileInProgress", methodPolicy);
- methodNameToPolicyMap.put("reportBadBlocks", methodPolicy);
- methodNameToPolicyMap.put("isDir", methodPolicy);
- methodNameToPolicyMap.put("getListing", methodPolicy);
- methodNameToPolicyMap.put("getHints", methodPolicy);
- methodNameToPolicyMap.put("getBlockLocations", methodPolicy);
- methodNameToPolicyMap.put("renewLease", methodPolicy);
- methodNameToPolicyMap.put("getStats", methodPolicy);
- methodNameToPolicyMap.put("getDatanodeReport", methodPolicy);
- methodNameToPolicyMap.put("getPreferredBlockSize", methodPolicy);
- methodNameToPolicyMap.put("getEditLogSize", methodPolicy);
- methodNameToPolicyMap.put("complete", methodPolicy);
- methodNameToPolicyMap.put("getEditLogSize", methodPolicy);
methodNameToPolicyMap.put("create", methodPolicy);
return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java Mon May 5
14:24:25 2008
@@ -24,22 +24,23 @@
import java.net.UnknownHostException;
import java.io.IOException;
-import java.io.EOFException;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.FilterInputStream;
-import java.io.FilterOutputStream;
+import java.io.InputStream;
import java.util.Hashtable;
import java.util.Iterator;
+import java.util.Map.Entry;
import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
@@ -62,28 +63,65 @@
private Hashtable<ConnectionId, Connection> connections =
new Hashtable<ConnectionId, Connection>();
- private Class valueClass; // class of call values
- private int timeout;// timeout for calls
+ private Class<?> valueClass; // class of call values
private int counter; // counter for call ids
private boolean running = true; // true while client runs
- private Configuration conf;
- private int maxIdleTime; //connections will be culled if it was idle for
+ final private Configuration conf;
+ final private int maxIdleTime; //connections will be culled if it was idle
for
//maxIdleTime msecs
final private int maxRetries; //the max. no. of retries for socket
connections
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
- private Thread connectionCullerThread;
+ private int pingInterval; // how often sends ping to the server in msecs
+
private SocketFactory socketFactory; // how to create sockets
-
private int refCount = 1;
+ final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
+ final public static int DEFAULT_PING_INTERVAL = 60000; // 1 min
+ final static int PING_CALL_ID = -1;
+
+ /**
+ * set the ping interval value in configuration
+ *
+ * @param conf Configuration
+ * @param pingInterval the ping interval
+ */
+ final public static void setPingInterval(Configuration conf, int
pingInterval) {
+ conf.setInt(PING_INTERVAL_NAME, pingInterval);
+ }
+
+ /**
+ * Get the ping interval from configuration;
+ * If not set in the configuration, return the default value.
+ *
+ * @param conf Configuration
+ * @return the ping interval
+ */
+ final static int getPingInterval(Configuration conf) {
+ return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
+ }
+
+ /**
+ * Increment this client's reference count
+ *
+ */
synchronized void incCount() {
- refCount++;
+ refCount++;
}
+ /**
+ * Decrement this client's reference count
+ *
+ */
synchronized void decCount() {
refCount--;
}
+ /**
+ * Return if this client has no reference
+ *
+ * @return true if this client has no reference; false otherwise
+ */
synchronized boolean isZeroReference() {
return refCount==0;
}
@@ -93,9 +131,7 @@
int id; // call id
Writable param; // parameter
Writable value; // value, null if error
- String error; // exception, null if value
- String errorClass; // class of exception
- long lastActivity; // time of last i/o
+ IOException error; // exception, null if value
boolean done; // true when call is done
protected Call(Writable param) {
@@ -103,30 +139,34 @@
synchronized (Client.this) {
this.id = counter++;
}
- touch();
}
- /** Called by the connection thread when the call is complete and the
- * value or error string are available. Notifies by default. */
- public synchronized void callComplete() {
+ /** Indicate when the call is complete and the
+ * value or error are available. Notifies by default. */
+ protected synchronized void callComplete() {
+ this.done = true;
notify(); // notify caller
}
- /** Update lastActivity with the current time. */
- public synchronized void touch() {
- lastActivity = System.currentTimeMillis();
- }
-
- /** Update lastActivity with the current time. */
- public synchronized void setResult(Writable value,
- String errorClass,
- String error) {
- this.value = value;
+ /** Set the exception when there is an error.
+ * Notify the caller the call is done.
+ *
+ * @param error exception thrown by the call; either local or remote
+ */
+ public synchronized void setException(IOException error) {
this.error = error;
- this.errorClass =errorClass;
- this.done = true;
+ callComplete();
}
+ /** Set the return value when there is no error.
+ * Notify the caller the call is done.
+ *
+ * @param value return value of the call.
+ */
+ public synchronized void setValue(Writable value) {
+ this.value = value;
+ callComplete();
+ }
}
/** Thread that reads responses and notifies callers. Each connection owns a
@@ -139,11 +179,9 @@
private DataOutputStream out;
// currently active calls
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
- private Call readingCall;
- private Call writingCall;
- private int inUse = 0;
- private long lastActivity = 0;
- private boolean shouldCloseConnection = false;
+ private long lastActivity = 0; // last I/O activity time
+ private boolean shouldCloseConnection = false; // indicate if the
connection is closed
+ private IOException closeException; // close reason
public Connection(InetSocketAddress address) throws IOException {
this(new ConnectionId(address, null));
@@ -155,59 +193,129 @@
remoteId.getAddress().getHostName());
}
this.remoteId = remoteId;
- this.setName("IPC Client connection to " +
- remoteId.getAddress().toString());
+ UserGroupInformation ticket = remoteId.getTicket();
+ this.setName("IPC Client (" + socketFactory.hashCode() +") connection to
" +
+ remoteId.getAddress().toString() +
+ " from " + ((ticket==null)?"an unknown user":ticket.getUserName()));
this.setDaemon(true);
}
- public synchronized void setupIOstreams() throws IOException {
- if (socket != null) {
- notify();
+ /** Update lastActivity with the current time. */
+ private synchronized void touch() {
+ touch(System.currentTimeMillis());
+ }
+
+ private synchronized void touch(long curTime) {
+ lastActivity = curTime;
+ }
+
+ /** Add a call to this connection's call queue */
+ private synchronized boolean addCall(Call call) {
+ if (shouldCloseConnection)
+ return false;
+ calls.put(call.id, call);
+ notify();
+ return true;
+ }
+
+ /** This class sends a ping to the remote side when timeout on
+ * reading. If no failure is detected, it retries until at least
+ * a byte is read.
+ */
+ private class PingInputStream extends FilterInputStream {
+ /* constructor */
+ protected PingInputStream(InputStream in) {
+ super(in);
+ }
+
+ /* Process timeout exception
+ * if the connection is not going to be closed, send a ping.
+ * otherwise, throw the timeout exception.
+ */
+ private void handleTimeout(SocketTimeoutException e) throws IOException {
+ if (shouldCloseConnection || !running) {
+ throw e;
+ } else {
+ sendPing();
+ }
+ }
+
+ /** Read a byte from the stream.
+ * Send a ping if timeout on read. Retries if no failure is detected
+ * until a byte is read.
+ */
+ public int read() throws IOException {
+ do {
+ try {
+ return super.read();
+ } catch (SocketTimeoutException e) {
+ handleTimeout(e);
+ }
+ } while (true);
+ }
+
+ /** Read bytes into a buffer starting from offset <code>off</code>
+ * Send a ping if timeout on read. Retries if no failure is detected
+ * until a byte is read.
+ *
+ * @Return the total number of bytes read; -1 if the connection is
closed.
+ */
+ public int read(byte[] buf, int off, int len) throws IOException {
+ do {
+ try {
+ return super.read(buf, off, len);
+ } catch (SocketTimeoutException e) {
+ handleTimeout(e);
+ }
+ } while (true);
+ }
+ }
+
+ /** Connect to the server and set up the I/O streams. It then sends
+ * a header to the server and starts
+ * the connection thread that waits for responses.
+ */
+ private synchronized void setupIOstreams() {
+ if (socket != null || shouldCloseConnection) {
return;
}
short ioFailures = 0;
short timeoutFailures = 0;
- while (true) {
- try {
- this.socket = socketFactory.createSocket();
- this.socket.setTcpNoDelay(tcpNoDelay);
- // connection time out is 20s
- this.socket.connect(remoteId.getAddress(), 20000);
- break;
- } catch (SocketTimeoutException toe) {
- /* The max number of retries is 45,
- * which amounts to 20s*45 = 15 minutes retries.
- */
- handleConnectionFailure(timeoutFailures++, 45, toe);
- } catch (IOException ie) {
- handleConnectionFailure(ioFailures++, maxRetries, ie);
- }
- }
- socket.setSoTimeout(timeout);
- this.in = new DataInputStream
- (new BufferedInputStream
- (new FilterInputStream(NetUtils.getInputStream(socket)) {
- public int read(byte[] buf, int off, int len) throws IOException {
- int value = super.read(buf, off, len);
- if (readingCall != null) {
- readingCall.touch();
- }
- return value;
- }
- }));
- this.out = new DataOutputStream
- (new BufferedOutputStream
- (new FilterOutputStream(NetUtils.getOutputStream(socket)) {
- public void write(byte[] buf, int o, int len) throws IOException {
- out.write(buf, o, len);
- if (writingCall != null) {
- writingCall.touch();
- }
- }
- }));
- writeHeader();
- notify();
+ try {
+ LOG.info("Build a connection to "+remoteId.getAddress());
+ while (true) {
+ try {
+ this.socket = socketFactory.createSocket();
+ this.socket.setTcpNoDelay(tcpNoDelay);
+ // connection time out is 20s
+ this.socket.connect(remoteId.getAddress(), 20000);
+ this.socket.setSoTimeout(pingInterval);
+ break;
+ } catch (SocketTimeoutException toe) {
+ /* The max number of retries is 45,
+ * which amounts to 20s*45 = 15 minutes retries.
+ */
+ handleConnectionFailure(timeoutFailures++, 45, toe);
+ } catch (IOException ie) {
+ handleConnectionFailure(ioFailures++, maxRetries, ie);
+ }
+ }
+ this.in = new DataInputStream(new BufferedInputStream
+ (new PingInputStream(NetUtils.getInputStream(socket))));
+ this.out = new DataOutputStream
+ (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
+ writeHeader();
+
+ // update last activity time
+ touch();
+
+ } catch (IOException e) {
+ markClosed(e);
+ close();
+ }
+ // start the receiver thread after the socket connection has been set up
+ start();
}
/* Handle connection failures
@@ -235,11 +343,6 @@
// throw the exception if the maximum number of retries is reached
if (curRetries == maxRetries) {
- //reset inUse so that the culler gets a chance to throw this
- //connection object out of the table. We don't want to increment
- //inUse to infinity (everytime getConnection is called inUse is
- //incremented)!
- inUse = 0;
throw ioe;
}
@@ -251,8 +354,11 @@
LOG.info("Retrying connect to server: " + remoteId.getAddress() +
". Already tried " + curRetries + " time(s).");
}
-
- private synchronized void writeHeader() throws IOException {
+
+ /* Write the header for each connection
+ * Out is not synchronized because only the first thread does this.
+ */
+ private void writeHeader() throws IOException {
out.write(Server.HEADER.array());
out.write(Server.CURRENT_VERSION);
//When there are more fields we can have ConnectionHeader Writable.
@@ -264,155 +370,197 @@
out.write(buf.getData(), 0, bufLen);
}
+ /* wait till someone signals us to start reading RPC response or
+ * it is idle too long, it is marked as to be closed,
+ * or the client is marked as not running.
+ *
+ * Return true if it is time to read a response; false otherwise.
+ */
private synchronized boolean waitForWork() {
- //wait till someone signals us to start reading RPC response or
- //close the connection. If we are idle long enough (blocked in wait),
- //the ConnectionCuller thread will wake us up and ask us to close the
- //connection.
- //We need to wait when inUse is 0 or socket is null (it may be null if
- //the Connection object has been created but the socket connection
- //has not been setup yet). We stop waiting if we have been asked to close
- //connection
- while ((inUse == 0 || socket == null) && !shouldCloseConnection) {
- try {
- wait();
- } catch (InterruptedException e) {}
+ if (calls.isEmpty() && !shouldCloseConnection && running) {
+ long timeout = maxIdleTime-(System.currentTimeMillis()-lastActivity);
+ if (timeout>0) {
+ try {
+ wait(timeout);
+ } catch (InterruptedException e) {}
+ }
}
- return !shouldCloseConnection;
- }
-
- private synchronized void incrementRef() {
- inUse++;
- }
-
- private synchronized void decrementRef() {
- lastActivity = System.currentTimeMillis();
- inUse--;
- }
-
- public synchronized boolean isIdle() {
- //check whether the connection is in use or just created
- if (inUse != 0) return false;
- long currTime = System.currentTimeMillis();
- if (currTime - lastActivity > maxIdleTime)
+
+ if (!calls.isEmpty() && !shouldCloseConnection && running) {
return true;
- return false;
+ } else if (shouldCloseConnection) {
+ return false;
+ } else if (!running) { //get stopped
+ markClosed((IOException)new IOException().initCause(
+ new InterruptedException()));
+ return false;
+ } else { // closed because it has been idle for more than maxIdleTime
+ markClosed(null);
+ return false;
+ }
}
public InetSocketAddress getRemoteAddress() {
return remoteId.getAddress();
}
- public void setCloseConnection() {
- shouldCloseConnection = true;
+ /* Send a ping to the server if the time elapsed
+ * since last I/O activity is equal to or greater than the ping interval
+ */
+ private synchronized void sendPing() throws IOException {
+ long curTime = System.currentTimeMillis();
+ if ( curTime - lastActivity >= pingInterval) {
+ touch(curTime);
+ synchronized (out) {
+ out.writeInt(PING_CALL_ID);
+ out.flush();
+ }
+ }
}
public void run() {
if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": starting");
- try {
- while (running) {
- int id;
- //wait here for work - read connection or close connection
- if (waitForWork() == false)
- break;
- try {
- id = in.readInt(); // try to read an id
- } catch (SocketTimeoutException e) {
- continue;
- }
+ LOG.debug(getName() + ": starting, having connections "
+ + connections.size());
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + " got value #" + id);
-
- Call call = calls.remove(id);
- boolean isError = in.readBoolean(); // read if error
- if (isError) {
- call.setResult(null, WritableUtils.readString(in),
- WritableUtils.readString(in));
- } else {
- Writable value = (Writable)ReflectionUtils.newInstance(valueClass,
conf);
- try {
- readingCall = call;
- value.readFields(in); // read value
- } finally {
- readingCall = null;
- }
- call.setResult(value, null, null);
- }
- call.callComplete(); // deliver result to caller
- //received the response. So decrement the ref count
- decrementRef();
- }
- } catch (EOFException eof) {
- // This is what happens when the remote side goes down
- } catch (Exception e) {
- LOG.info(StringUtils.stringifyException(e));
- } finally {
- //If there was no exception thrown in this method, then the only
- //way we reached here is by breaking out of the while loop (after
- //waitForWork). And if we took that route to reach here, we have
- //already removed the connection object in the ConnectionCuller thread.
- //We don't want to remove this again as some other thread might have
- //actually put a new Connection object in the table in the meantime.
- synchronized (connections) {
- if (connections.get(remoteId) == this) {
- connections.remove(remoteId);
- }
- }
- close();
+ while (waitForWork()) {//wait here for work - read or close connection
+ receiveResponse();
}
+
+ close();
+
+ if (LOG.isDebugEnabled())
+ LOG.debug(getName() + ": stopped, remaining connections "
+ + connections.size());
}
/** Initiates a call by sending the parameter to the remote server.
* Note: this is not called from the Connection thread, but by other
* threads.
*/
- public void sendParam(Call call) throws IOException {
- boolean error = true;
+ public void sendParam(Call call) {
+ synchronized (this) {
+ if (shouldCloseConnection) {
+ return;
+ }
+ }
+
try {
- calls.put(call.id, call);
synchronized (out) {
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
- try {
- writingCall = call;
- DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
- //data to be written
- d.writeInt(call.id);
- call.param.write(d);
- byte[] data = d.getData();
- int dataLength = d.getLength();
-
- out.writeInt(dataLength); //first put the data length
- out.write(data, 0, dataLength);//write the data
- out.flush();
- } finally {
- writingCall = null;
- }
- }
- error = false;
- } finally {
- if (error) {
- synchronized (connections) {
- if (connections.get(remoteId) == this)
- connections.remove(remoteId);
- }
- close(); // close on error
+
+ DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
+ //data to be written
+ d.writeInt(call.id);
+ call.param.write(d);
+ byte[] data = d.getData();
+ int dataLength = d.getLength();
+
+ out.writeInt(dataLength); //first put the data length
+ out.write(data, 0, dataLength);//write the data
+ out.flush();
}
+ } catch(IOException e) {
+ markClosed(e);
}
}
- /** Close the connection. */
- public void close() {
- //socket may be null if the connection could not be established to the
- //server in question, and the culler asked us to close the connection
- if (socket == null) return;
+ /* Receive a response.
+ * Because only one receiver, so no synchronization on in.
+ */
+ private void receiveResponse() {
+ synchronized (this) {
+ if (shouldCloseConnection) {
+ return;
+ }
+ }
+ touch();
+
try {
- socket.close(); // close socket
- } catch (IOException e) {}
+ int id = in.readInt(); // try to read an id
+
+ if (LOG.isDebugEnabled())
+ LOG.debug(getName() + " got value #" + id);
+
+ Call call = calls.remove(id);
+
+ boolean isError = in.readBoolean(); // read if error
+ if (isError) {
+ call.setException(new RemoteException( WritableUtils.readString(in),
+ WritableUtils.readString(in)));
+ } else {
+ Writable value = (Writable)ReflectionUtils.newInstance(valueClass,
conf);
+ value.readFields(in); // read value
+ call.setValue(value);
+ }
+ } catch (IOException e) {
+ markClosed(e);
+ }
+ }
+
+ private synchronized void markClosed(IOException e) {
+ if (!shouldCloseConnection) {
+ shouldCloseConnection = true;
+ closeException = e;
+ notifyAll();
+ }
+ }
+
+ /** Close the connection. */
+ private synchronized void close() {
+ if (!shouldCloseConnection) {
+ LOG.error("The connection is not in the closed state");
+ return;
+ }
+
+ synchronized (out) {
+ // release the resources
+ // first thing to do;take the connection out of the connection list
+ synchronized (connections) {
+ if (connections.get(remoteId) == this) {
+ connections.remove(remoteId);
+ }
+ }
+
+ // close the socket and streams
+ IOUtils.closeStream(in);
+ IOUtils.closeStream(out);
+ IOUtils.closeSocket(socket);
+
+ // clean up all calls
+ if (closeException == null) {
+ if (!calls.isEmpty()) {
+ LOG.warn(
+ "A connection is closed for no cause and calls are not empty");
+
+ // clean up calls anyway
+ closeException = new IOException("Unexpected closed connection");
+ cleanupCalls();
+ }
+ } else {
+ // log the info
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("closing ipc connection to " + remoteId.address + ": " +
+ StringUtils.stringifyException(closeException));
+ }
+
+ // cleanup calls
+ cleanupCalls();
+ }
+ }
if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": closing");
+ LOG.debug(getName() + ": closed");
+ }
+
+ /* Cleanup all calls and mark them as done */
+ private void cleanupCalls() {
+ Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;
+ while (itor.hasNext()) {
+ Call c = itor.next().getValue();
+ c.setException(closeException); // local exception
+ itor.remove();
+ }
}
}
@@ -428,7 +576,7 @@
}
/** Deliver result to result collector. */
- public void callComplete() {
+ protected void callComplete() {
results.callComplete(this);
}
}
@@ -453,58 +601,21 @@
}
}
- private class ConnectionCuller extends Thread {
-
- public static final int MIN_SLEEP_TIME = 1000;
-
- public void run() {
-
- LOG.debug(getName() + ": starting");
-
- while (running) {
- try {
- Thread.sleep(MIN_SLEEP_TIME);
- } catch (InterruptedException ie) {}
-
- synchronized (connections) {
- Iterator i = connections.values().iterator();
- while (i.hasNext()) {
- Connection c = (Connection)i.next();
- if (c.isIdle()) {
- //We don't actually close the socket here (i.e., don't invoke
- //the close() method). We leave that work to the response
receiver
- //thread. The reason for that is since we have taken a lock on
the
- //connections table object, we don't want to slow down the entire
- //system if we happen to talk to a slow server.
- i.remove();
- synchronized (c) {
- c.setCloseConnection();
- c.notify();
- }
- }
- }
- }
- }
- }
- }
-
/** Construct an IPC client whose values are of the given [EMAIL PROTECTED]
Writable}
* class. */
public Client(Class valueClass, Configuration conf,
SocketFactory factory) {
this.valueClass = valueClass;
- this.timeout = conf.getInt("ipc.client.timeout", 10000);
- this.maxIdleTime = conf.getInt("ipc.client.connection.maxidletime", 1000);
+ this.maxIdleTime =
+ conf.getInt("ipc.client.connection.maxidletime", 10000); //10s
this.maxRetries = conf.getInt("ipc.client.connect.max.retries", 10);
this.tcpNoDelay = conf.getBoolean("ipc.client.tcpnodelay", false);
+ this.pingInterval = getPingInterval(conf);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The ping interval is" + this.pingInterval + "ms.");
+ }
this.conf = conf;
this.socketFactory = factory;
- this.connectionCullerThread = new ConnectionCuller();
- connectionCullerThread.setDaemon(true);
- connectionCullerThread.setName(valueClass.getName() + " Connection
Culler");
- LOG.debug(valueClass.getName() +
- "Connection culler maxidletime= " + maxIdleTime + "ms");
- connectionCullerThread.start();
}
/**
@@ -535,19 +646,11 @@
return;
}
running = false;
-
- connectionCullerThread.interrupt();
- try {
- connectionCullerThread.join();
- } catch(InterruptedException e) {}
-
- // close and wake up all connections
+
+ // wake up all connections
synchronized (connections) {
for (Connection conn : connections.values()) {
- synchronized (conn) {
- conn.setCloseConnection();
- conn.notifyAll();
- }
+ conn.interrupt();
}
}
@@ -560,9 +663,6 @@
}
}
- /** Sets the timeout used for network i/o. */
- public void setTimeout(int timeout) { this.timeout = timeout; }
-
/** Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code>, returning the value. Throws exceptions if there are
* network problems or if the remote code threw an exception. */
@@ -574,20 +674,24 @@
public Writable call(Writable param, InetSocketAddress addr,
UserGroupInformation ticket)
throws InterruptedException, IOException {
- Connection connection = getConnection(addr, ticket);
Call call = new Call(param);
+ Connection connection = getConnection(addr, ticket, call);
+ connection.sendParam(call); // send the parameter
synchronized (call) {
- connection.sendParam(call); // send the parameter
- long wait = timeout;
- do {
- call.wait(wait); // wait for the result
- wait = timeout - (System.currentTimeMillis() - call.lastActivity);
- } while (!call.done && wait > 0);
+ while (!call.done) {
+ try {
+ call.wait(); // wait for the result
+ } catch (InterruptedException ignored) {}
+ }
if (call.error != null) {
- throw new RemoteException(call.errorClass, call.error);
- } else if (!call.done) {
- throw new SocketTimeoutException("timed out waiting for rpc response");
+ if (call.error instanceof RemoteException) {
+ call.error.fillInStackTrace();
+ throw call.error;
+ } else { // local exception
+ throw (IOException)new IOException(
+ "Call failed on local exception").initCause(call.error);
+ }
} else {
return call.value;
}
@@ -607,7 +711,7 @@
for (int i = 0; i < params.length; i++) {
ParallelCall call = new ParallelCall(params[i], results, i);
try {
- Connection connection = getConnection(addresses[i], null);
+ Connection connection = getConnection(addresses[i], null, call);
connection.sendParam(call); // send each parameter
} catch (IOException e) {
LOG.info("Calling "+addresses[i]+" caught: " +
@@ -615,38 +719,44 @@
results.size--; // wait for one fewer result
}
}
- try {
- results.wait(timeout); // wait for all results
- } catch (InterruptedException e) {}
-
- if (results.count == 0) {
- throw new IOException("no responses");
- } else {
- return results.values;
+ while (results.count != results.size) {
+ try {
+ results.wait(); // wait for all results
+ } catch (InterruptedException e) {}
}
+
+ return results.values;
}
}
/** Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given host/port are reused. */
private Connection getConnection(InetSocketAddress addr,
- UserGroupInformation ticket)
+ UserGroupInformation ticket,
+ Call call)
throws IOException {
+ synchronized (this) {
+ if (!running) {
+ // the client is stopped
+ throw new IOException("The client is stopped");
+ }
+ }
Connection connection;
/* we could avoid this allocation for each RPC by having a
* connectionsId object and with set() method. We need to manage the
* refs for keys in HashMap properly. For now its ok.
*/
ConnectionId remoteId = new ConnectionId(addr, ticket);
- synchronized (connections) {
- connection = connections.get(remoteId);
- if (connection == null) {
- connection = new Connection(remoteId);
- connections.put(remoteId, connection);
- connection.start();
+ do {
+ synchronized (connections) {
+ connection = connections.get(remoteId);
+ if (connection == null) {
+ connection = new Connection(remoteId);
+ connections.put(remoteId, connection);
+ }
}
- connection.incrementRef();
- }
+ } while (!connection.addCall(call));
+
//we don't invoke the method below inside "synchronized (connections)"
//block above. The reason for that is if the server happens to be slow,
//it will take longer to establish a connection and that will slow the
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java Mon May 5
14:24:25 2008
@@ -72,15 +72,8 @@
*/
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
- // 1 : Ticket is added to connection header
- public static final byte CURRENT_VERSION = 1;
-
- /**
- * How much time should be allocated for actually running the handler?
- * Calls that are older than ipc.timeout * MAX_CALL_QUEUE_TIME
- * are ignored when the handler takes them off the queue.
- */
- private static final float MAX_CALL_QUEUE_TIME = 0.6f;
+ // 1 : Introduce ping and server does not throw away RPCs
+ public static final byte CURRENT_VERSION = 2;
/**
* How many calls/handler are allowed in the queue.
@@ -126,7 +119,7 @@
private String bindAddress;
private int port; // port we listen on
private int handlerCount; // number of handler threads
- private Class paramClass; // class of call parameters
+ private Class<?> paramClass; // class of call parameters
private int maxIdleTime; // the maximum idle time
after
// which a client may be
disconnected
private int thresholdIdleConnections; // the number of idle
connections
@@ -141,8 +134,6 @@
private Configuration conf;
- private int timeout;
- private long maxCallStartAge;
private int maxQueueSize;
private int socketSendBufferSize;
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
@@ -160,7 +151,7 @@
private Handler[] handlers = null;
/**
- * A convience method to bind to a given address and report
+ * A convenience method to bind to a given address and report
* better exceptions if the address is not a valid host.
* @param socket the socket to bind
* @param address the address to bind to
@@ -192,14 +183,15 @@
private int id; // the client's call id
private Writable param; // the parameter passed
private Connection connection; // connection to client
- private long receivedTime; // the time received
+ private long timestamp; // the time received when response is null
+ // the time served when response is not null
private ByteBuffer response; // the response for this
call
public Call(int id, Writable param, Connection connection) {
this.id = id;
this.param = param;
this.connection = connection;
- this.receivedTime = System.currentTimeMillis();
+ this.timestamp = System.currentTimeMillis();
this.response = null;
}
@@ -299,10 +291,9 @@
SelectionKey key = null;
try {
selector.select();
- Iterator iter = selector.selectedKeys().iterator();
-
+ Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
- key = (SelectionKey)iter.next();
+ key = iter.next();
iter.remove();
try {
if (key.isValid()) {
@@ -439,6 +430,8 @@
private class Responder extends Thread {
private Selector writeSelector;
private int pending; // connections waiting to register
+
+ final static int PURGE_INTERVAL = 900000; // 15mins
Responder() throws IOException {
this.setName("IPC Server Responder");
@@ -456,7 +449,7 @@
while (running) {
try {
waitPending(); // If a channel is being registered, wait.
- writeSelector.select(maxCallStartAge);
+ writeSelector.select(PURGE_INTERVAL);
Iterator<SelectionKey> iter =
writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
@@ -470,7 +463,7 @@
}
}
long now = System.currentTimeMillis();
- if (now < lastPurgeTime + maxCallStartAge) {
+ if (now < lastPurgeTime + PURGE_INTERVAL) {
continue;
}
lastPurgeTime = now;
@@ -544,34 +537,17 @@
LOG.info("doPurge: bad channel");
return;
}
- boolean close = false;
LinkedList<Call> responseQueue = call.connection.responseQueue;
synchronized (responseQueue) {
Iterator<Call> iter = responseQueue.listIterator(0);
while (iter.hasNext()) {
call = iter.next();
- if (now > call.receivedTime + maxCallStartAge) {
- LOG.info(getName() + ", call " + call +
- ": response discarded for being too old (" +
- (now - call.receivedTime) + ")");
- iter.remove();
- if (call.response.position() > 0) {
- /* We should probably use a different start time
- * than receivedTime. receivedTime starts when the RPC
- * was first read.
- * We have written a partial response. will close the
- * connection for now.
- */
- close = true;
- break;
- }
+ if (now > call.timestamp + PURGE_INTERVAL) {
+ closeConnection(call.connection);
+ break;
}
}
}
-
- if (close) {
- closeConnection(call.connection);
- }
}
// Processes one response. Returns true if there are no more pending
@@ -627,6 +603,9 @@
call.connection.responseQueue.addFirst(call);
if (inHandler) {
+ // set the serve time when the response has to be sent later
+ call.timestamp = System.currentTimeMillis();
+
incPending();
try {
// Wakeup the thread blocked on select, only then can the call
@@ -791,6 +770,11 @@
if (data == null) {
dataLengthBuffer.flip();
dataLength = dataLengthBuffer.getInt();
+
+ if (dataLength == Client.PING_CALL_ID) {
+ dataLengthBuffer.clear();
+ return 0; //ping message
+ }
data = ByteBuffer.allocate(dataLength);
}
@@ -868,18 +852,6 @@
try {
Call call = callQueue.take(); // pop the queue; maybe blocked here
- // throw the message away if it is too old
- if (System.currentTimeMillis() - call.receivedTime >
- maxCallStartAge) {
- ReflectionUtils.logThreadInfo(LOG, "Discarding call " + call, 30);
- int timeInQ = (int) (System.currentTimeMillis() -
call.receivedTime);
- LOG.warn(getName()+", call "+call
- +": discarded for being too old (" +
- timeInQ + ")");
- rpcMetrics.rpcDiscardedOps.inc(timeInQ);
- continue;
- }
-
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": has #" + call.id + " from " +
call.connection);
@@ -892,7 +864,7 @@
UserGroupInformation previous = UserGroupInformation.getCurrentUGI();
UserGroupInformation.setCurrentUGI(call.connection.ticket);
try {
- value = call(call.param, call.receivedTime); // make
the call
+ value = call(call.param, call.timestamp); // make the
call
} catch (Throwable e) {
LOG.info(getName()+", call "+call+": error: " + e, e);
errorClass = e.getClass().getName();
@@ -939,7 +911,7 @@
* the number of handler threads that will be used to process calls.
*
*/
- protected Server(String bindAddress, int port, Class paramClass, int
handlerCount, Configuration conf,
+ protected Server(String bindAddress, int port, Class<?> paramClass, int
handlerCount, Configuration conf,
String serverName)
throws IOException {
this.bindAddress = bindAddress;
@@ -947,10 +919,8 @@
this.port = port;
this.paramClass = paramClass;
this.handlerCount = handlerCount;
- this.timeout = conf.getInt("ipc.client.timeout", 10000);
this.socketSendBufferSize = 0;
- maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME);
- maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
+ this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000);
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
@@ -978,9 +948,6 @@
}
}
- /** Sets the timeout used for network i/o. */
- public void setTimeout(int timeout) { this.timeout = timeout; }
-
/** Sets the socket buffer size used for responding to RPCs */
public void setSocketSendBufSize(int size) { this.socketSendBufferSize =
size; }
Modified:
hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
Mon May 5 14:24:25 2008
@@ -39,7 +39,7 @@
* This class has a number of metrics variables that are publicly accessible;
* these variables (objects) have methods to update their values;
* for example:
- * <p> [EMAIL PROTECTED] #rpcDiscardedOps}.inc(time)
+ * <p> [EMAIL PROTECTED] #rpcQueueTime}.inc(time)
*
*/
public class RpcMetrics implements Updater {
@@ -71,7 +71,6 @@
public MetricsTimeVaryingRate rpcQueueTime = new
MetricsTimeVaryingRate("RpcQueueTime");
public MetricsTimeVaryingRate rpcProcessingTime = new
MetricsTimeVaryingRate("RpcProcessingTime");
- public MetricsTimeVaryingRate rpcDiscardedOps = new
MetricsTimeVaryingRate("RpcDiscardedOps");
public Map <String, MetricsTimeVaryingRate> metricsList =
Collections.synchronizedMap(new HashMap<String, MetricsTimeVaryingRate>());
@@ -83,7 +82,6 @@
public void doUpdates(MetricsContext context) {
rpcQueueTime.pushMetric(metricsRecord);
rpcProcessingTime.pushMetric(metricsRecord);
- rpcDiscardedOps.pushMetric(metricsRecord);
synchronized (metricsList) {
// Iterate through the rpcMetrics hashmap to propogate the different
rpc metrics.
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMgt.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMgt.java?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMgt.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMgt.java Mon
May 5 14:24:25 2008
@@ -98,20 +98,6 @@
/**
* @inheritDoc
*/
- public int getRpcOpsDiscardedOpsNum() {
- return myMetrics.rpcDiscardedOps.getPreviousIntervalNumOps();
- }
-
- /**
- * @inheritDoc
- */
- public long getRpcOpsDiscardedOpsQtime() {
- return myMetrics.rpcDiscardedOps.getPreviousIntervalAverageTime();
- }
-
- /**
- * @inheritDoc
- */
public int getNumOpenConnections() {
return myServer.getNumOpenConnections();
}
@@ -129,6 +115,5 @@
public void resetAllMinMax() {
myMetrics.rpcProcessingTime.resetMinMax();
myMetrics.rpcQueueTime.resetMinMax();
- myMetrics.rpcDiscardedOps.resetMinMax();
}
}
Modified:
hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMgtMBean.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMgtMBean.java?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMgtMBean.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMgtMBean.java
Mon May 5 14:24:25 2008
@@ -86,19 +86,6 @@
*/
long getRpcOpsAvgQueueTimeMax();
-
- /**
- * Number of Discarded RPC operations due to timeout in the last interval
- * @return number of operations
- */
- int getRpcOpsDiscardedOpsNum();
-
- /**
- * Average Queued time for Discarded RPC Operations in last interval
- * @return time in msec
- */
- long getRpcOpsDiscardedOpsQtime();
-
/**
* Reset all min max times
*/
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Mon May
5 14:24:25 2008
@@ -333,7 +333,6 @@
}
JobSubmissionProtocol jobSubmitClient;
- private JobSubmissionProtocol rpcProxy;
FileSystem fs = null;
@@ -384,8 +383,7 @@
if ("local".equals(tracker)) {
this.jobSubmitClient = new LocalJobRunner(conf);
} else {
- this.rpcProxy = createRPCProxy(JobTracker.getAddress(conf), conf);
- this.jobSubmitClient = createRetryProxy(this.rpcProxy);
+ this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
}
}
@@ -395,27 +393,6 @@
JobSubmissionProtocol.versionID, addr, conf,
NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
}
- /**
- * Create a proxy JobSubmissionProtocol that retries timeouts.
- *
- * @param addr the address to connect to.
- * @param conf the server's configuration.
- * @return a proxy object that will retry timeouts.
- * @throws IOException
- */
- private JobSubmissionProtocol createRetryProxy(JobSubmissionProtocol raw
- ) throws IOException {
- RetryPolicy backoffPolicy =
- RetryPolicies.retryUpToMaximumCountWithProportionalSleep
- (5, 10, java.util.concurrent.TimeUnit.SECONDS);
- Map<Class<? extends Exception>, RetryPolicy> handlers =
- new HashMap<Class<? extends Exception>, RetryPolicy>();
- handlers.put(SocketTimeoutException.class, backoffPolicy);
- RetryPolicy backoffTimeOuts =
-
RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,handlers);
- return (JobSubmissionProtocol)
- RetryProxy.create(JobSubmissionProtocol.class, raw, backoffTimeOuts);
- }
/**
* Build a job client, connect to the indicated job tracker.
@@ -425,15 +402,16 @@
*/
public JobClient(InetSocketAddress jobTrackAddr,
Configuration conf) throws IOException {
- rpcProxy = createRPCProxy(jobTrackAddr, conf);
- jobSubmitClient = createRetryProxy(rpcProxy);
+ jobSubmitClient = createRPCProxy(jobTrackAddr, conf);
}
/**
* Close the <code>JobClient</code>.
*/
public synchronized void close() throws IOException {
- RPC.stopProxy(rpcProxy);
+ if (!(jobSubmitClient instanceof LocalJobRunner)) {
+ RPC.stopProxy(jobSubmitClient);
+ }
}
/**
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java Mon May 5
14:24:25 2008
@@ -22,6 +22,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.net.NetUtils;
import java.util.Random;
@@ -36,9 +37,13 @@
public class TestIPC extends TestCase {
public static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.TestIPC");
-
- private static Configuration conf = new Configuration();
+ final private static Configuration conf = new Configuration();
+ final static private int PING_INTERVAL = 1000;
+
+ static {
+ Client.setPingInterval(conf, PING_INTERVAL);
+ }
public TestIPC(String name) { super(name); }
private static final Random RANDOM = new Random();
@@ -51,14 +56,13 @@
public TestServer(int handlerCount, boolean sleep)
throws IOException {
super(ADDRESS, 0, LongWritable.class, handlerCount, conf);
- this.setTimeout(1000);
this.sleep = sleep;
}
public Writable call(Writable param, long receivedTime) throws IOException
{
if (sleep) {
try {
- Thread.sleep(RANDOM.nextInt(200)); // sleep a bit
+ Thread.sleep(RANDOM.nextInt(2*PING_INTERVAL)); // sleep a bit
} catch (InterruptedException e) {}
}
return param; // echo param as result
@@ -75,7 +79,6 @@
this.client = client;
this.server = server;
this.count = count;
- client.setTimeout(1000);
}
public void run() {
@@ -90,7 +93,7 @@
break;
}
} catch (Exception e) {
- LOG.fatal("Caught: " + e);
+ LOG.fatal("Caught: " + StringUtils.stringifyException(e));
failed = true;
}
}
@@ -108,7 +111,6 @@
this.client = client;
this.addresses = addresses;
this.count = count;
- client.setTimeout(1000);
}
public void run() {
@@ -126,7 +128,7 @@
}
}
} catch (Exception e) {
- LOG.fatal("Caught: " + e);
+ LOG.fatal("Caught: " + StringUtils.stringifyException(e));
failed = true;
}
}
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPCServerResponder.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPCServerResponder.java?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPCServerResponder.java
(original)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPCServerResponder.java
Mon May 5 14:24:25 2008
@@ -64,7 +64,6 @@
public TestServer(final int handlerCount, final boolean sleep)
throws IOException {
super(ADDRESS, 0, BytesWritable.class, handlerCount, conf);
- this.setTimeout(1000);
// Set the buffer size to half of the maximum parameter/result size
// to force the socket to block
this.setSocketSendBufSize(BYTE_COUNT / 2);
@@ -95,7 +94,6 @@
this.client = client;
this.address = address;
this.count = count;
- client.setTimeout(1000);
}
@Override
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java?rev=653607&r1=653606&r2=653607&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java Mon
May 5 14:24:25 2008
@@ -71,7 +71,7 @@
DistributedFileSystem dfs = (DistributedFileSystem) fs;
JobClient client = null;
-
+ MiniMRCluster mr = null;
try {
// This will test RPC to the NameNode only.
// could we test Client-DataNode connections?
@@ -85,7 +85,7 @@
assertTrue(dfs.exists(filePath));
// This will test TPC to a JobTracker
- MiniMRCluster mr = new MiniMRCluster(1, fs.getUri().toString(), 1);
+ mr = new MiniMRCluster(1, fs.getUri().toString(), 1);
final int jobTrackerPort = mr.getJobTrackerPort();
JobConf jconf = new JobConf(cconf);
@@ -128,6 +128,13 @@
// nothing we can do
ignored.printStackTrace();
}
+ if (mr != null) {
+ try {
+ mr.shutdown();
+ } catch (Exception ignored) {
+ ignored.printStackTrace();
+ }
+ }
}
}
}