Author: mattf
Date: Tue Aug 2 17:46:32 2011
New Revision: 1153219
URL: http://svn.apache.org/viewvc?rev=1153219&view=rev
Log:
HADOOP-6889. Make RPC to have an option to timeout - backport to 0.20-security.
Contributed by John George and Ravi Prakash.
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/ipc/Client.java
hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/ipc/RPC.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/ipc/TestIPC.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1153219&r1=1153218&r2=1153219&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Tue Aug 2 17:46:32
2011
@@ -41,6 +41,9 @@ Release 0.20.205.0 - unreleased
MAPREDUCE-2494. Make the distributed cache delete entires using LRU
priority
(Robert Joseph Evans via mahadev)
+ HADOOP-6889. Make RPC to have an option to timeout - backport to
+ 0.20-security. (John George and Ravi Prakash via mattf)
+
Release 0.20.204.0 - unreleased
NEW FEATURES
Modified:
hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/ipc/Client.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/ipc/Client.java?rev=1153219&r1=1153218&r2=1153219&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/ipc/Client.java
(original)
+++
hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/ipc/Client.java
Tue Aug 2 17:46:32 2011
@@ -197,6 +197,7 @@ public class Client {
private Socket socket = null; // connected socket
private DataInputStream in;
private DataOutputStream out;
+ private int rpcTimeout;
private int maxIdleTime; //connections will be culled if it was idle for
//maxIdleTime msecs
private int maxRetries; //the max. no. of retries for socket connections
@@ -223,7 +224,7 @@ public class Client {
if (LOG.isDebugEnabled()) {
LOG.debug("The ping interval is" + this.pingInterval + "ms.");
}
-
+ this.rpcTimeout = remoteId.getRpcTimeout();
UserGroupInformation ticket = remoteId.getTicket();
Class<?> protocol = remoteId.getProtocol();
this.useSasl = UserGroupInformation.isSecurityEnabled();
@@ -305,11 +306,13 @@ public class Client {
}
/* Process timeout exception
- * if the connection is not going to be closed, send a ping.
+ * if the connection is not going to be closed or
+ * is not configured to have a RPC timeout, send a ping.
+ * (if rpcTimeout is not set to be 0, then RPC should timeout.
* otherwise, throw the timeout exception.
*/
private void handleTimeout(SocketTimeoutException e) throws IOException {
- if (shouldCloseConnection.get() || !running.get()) {
+ if (shouldCloseConnection.get() || !running.get() || rpcTimeout > 0) {
throw e;
} else {
sendPing();
@@ -412,6 +415,10 @@ public class Client {
// connection time out is 20s
NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
+ if (rpcTimeout > 0) {
+ pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval
+ }
+
this.socket.setSoTimeout(pingInterval);
return;
} catch (SocketTimeoutException toe) {
@@ -965,39 +972,41 @@ public class Client {
public Writable call(Writable param, InetSocketAddress addr,
UserGroupInformation ticket)
throws InterruptedException, IOException {
- ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket,
+ ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket, 0,
conf);
return call(param, remoteId);
}
/** Make a call, passing <code>param</code>, to the IPC server running at
- * <code>address</code> which is servicing the <code>protocol</code>
protocol,
- * with the <code>ticket</code> credentials, returning the value.
+ * <code>address</code> which is servicing the <code>protocol</code>
protocol,
+ * with the <code>ticket</code> credentials and <code>rpcTimeout</code> as
+ * timeout, returning the value.
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
* @deprecated Use {@link #call(Writable, ConnectionId)} instead
*/
@Deprecated
public Writable call(Writable param, InetSocketAddress addr,
- Class<?> protocol, UserGroupInformation ticket)
+ Class<?> protocol, UserGroupInformation ticket,
+ int rpcTimeout)
throws InterruptedException, IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
- ticket, conf);
+ ticket, rpcTimeout, conf);
return call(param, remoteId);
}
/** Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code> which is servicing the <code>protocol</code>
protocol,
- * with the <code>ticket</code> credentials and <code>conf</code> as
- * configuration for this connection, returning the value.
- * Throws exceptions if there are network problems or if the remote code
+ * with the <code>ticket</code> credentials, <code>rpcTimeout</code> as
timeout
+ * and <code>conf</code> as configuration for this connection, returning the
+ * value. Throws exceptions if there are network problems or if the remote
code
* threw an exception. */
- public Writable call(Writable param, InetSocketAddress addr,
+ public Writable call(Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket,
- Configuration conf)
+ int rpcTimeout, Configuration conf)
throws InterruptedException, IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
- ticket, conf);
+ ticket, rpcTimeout, conf);
return call(param, remoteId);
}
@@ -1107,7 +1116,7 @@ public class Client {
ParallelCall call = new ParallelCall(params[i], results, i);
try {
ConnectionId remoteId = ConnectionId.getConnectionId(addresses[i],
- protocol, ticket, conf);
+ protocol, ticket, 0, conf);
Connection connection = getConnection(remoteId, call);
connection.sendParam(call); // send each parameter
} catch (IOException e) {
@@ -1175,6 +1184,7 @@ public class Client {
UserGroupInformation ticket;
Class<?> protocol;
private static final int PRIME = 16777619;
+ private int rpcTimeout;
private String serverPrincipal;
private int maxIdleTime; //connections will be culled if it was idle for
//maxIdleTime msecs
@@ -1183,13 +1193,14 @@ public class Client {
private int pingInterval; // how often sends ping to the server in msecs
ConnectionId(InetSocketAddress address, Class<?> protocol,
- UserGroupInformation ticket,
+ UserGroupInformation ticket, int rpcTimeout,
String serverPrincipal, int maxIdleTime,
int maxRetries, boolean tcpNoDelay,
int pingInterval) {
this.protocol = protocol;
this.address = address;
this.ticket = ticket;
+ this.rpcTimeout = rpcTimeout;
this.serverPrincipal = serverPrincipal;
this.maxIdleTime = maxIdleTime;
this.maxRetries = maxRetries;
@@ -1208,7 +1219,11 @@ public class Client {
UserGroupInformation getTicket() {
return ticket;
}
-
+
+ private int getRpcTimeout() {
+ return rpcTimeout;
+ }
+
String getServerPrincipal() {
return serverPrincipal;
}
@@ -1232,9 +1247,15 @@ public class Client {
static ConnectionId getConnectionId(InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket,
Configuration conf) throws IOException {
+ return getConnectionId(addr, protocol, ticket, 0, conf);
+ }
+
+ static ConnectionId getConnectionId(InetSocketAddress addr,
+ Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
+ Configuration conf) throws IOException {
String remotePrincipal = getRemotePrincipal(conf, addr, protocol);
return new ConnectionId(addr, protocol, ticket,
- remotePrincipal,
+ rpcTimeout, remotePrincipal,
conf.getInt("ipc.client.connection.maxidletime", 10000), // 10s
conf.getInt("ipc.client.connect.max.retries", 10),
conf.getBoolean("ipc.client.tcpnodelay", false),
@@ -1276,6 +1297,7 @@ public class Client {
&& this.maxRetries == that.maxRetries
&& this.pingInterval == that.pingInterval
&& isEqual(this.protocol, that.protocol)
+ && this.rpcTimeout == that.rpcTimeout
&& isEqual(this.serverPrincipal, that.serverPrincipal)
&& this.tcpNoDelay == that.tcpNoDelay
&& isEqual(this.ticket, that.ticket);
@@ -1291,6 +1313,7 @@ public class Client {
result = PRIME * result + maxRetries;
result = PRIME * result + pingInterval;
result = PRIME * result + ((protocol == null) ? 0 :
protocol.hashCode());
+ result = PRIME * rpcTimeout;
result = PRIME * result
+ ((serverPrincipal == null) ? 0 : serverPrincipal.hashCode());
result = PRIME * result + (tcpNoDelay ? 1231 : 1237);
Modified:
hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/ipc/RPC.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/ipc/RPC.java?rev=1153219&r1=1153218&r2=1153219&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/ipc/RPC.java
(original)
+++
hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/ipc/RPC.java
Tue Aug 2 17:46:32 2011
@@ -207,9 +207,10 @@ public class RPC {
public Invoker(Class<? extends VersionedProtocol> protocol,
InetSocketAddress address, UserGroupInformation ticket,
- Configuration conf, SocketFactory factory) throws IOException {
+ Configuration conf, SocketFactory factory,
+ int rpcTimeout) throws IOException {
this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
- ticket, conf);
+ ticket, rpcTimeout, conf);
this.client = CLIENTS.getClient(conf, factory);
}
@@ -292,7 +293,7 @@ public class RPC {
InetSocketAddress addr,
Configuration conf
) throws IOException {
- return waitForProxy(protocol, clientVersion, addr, conf, Long.MAX_VALUE);
+ return waitForProxy(protocol, clientVersion, addr, conf, 0,
Long.MAX_VALUE);
}
/**
@@ -301,7 +302,7 @@ public class RPC {
* @param clientVersion client version
* @param addr remote address
* @param conf configuration to use
- * @param timeout time in milliseconds before giving up
+ * @param connTimeout time in milliseconds before giving up
* @return the proxy
* @throws IOException if the far end through a RemoteException
*/
@@ -310,13 +311,24 @@ public class RPC {
long clientVersion,
InetSocketAddress addr,
Configuration conf,
- long timeout
- ) throws IOException {
+ long connTimeout)
+ throws IOException {
+ return waitForProxy(protocol, clientVersion, addr, conf, 0, connTimeout);
+ }
+
+ static VersionedProtocol waitForProxy(
+ Class<? extends VersionedProtocol> protocol,
+ long clientVersion,
+ InetSocketAddress addr,
+ Configuration conf,
+ int rpcTimeout,
+ long connTimeout)
+ throws IOException {
long startTime = System.currentTimeMillis();
IOException ioe;
while (true) {
try {
- return getProxy(protocol, clientVersion, addr, conf);
+ return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
} catch(ConnectException se) { // namenode has not been started
LOG.info("Server at " + addr + " not available yet, Zzzzz...");
ioe = se;
@@ -325,7 +337,7 @@ public class RPC {
ioe = te;
}
// check if timed out
- if (System.currentTimeMillis()-timeout >= startTime) {
+ if (System.currentTimeMillis()-connTimeout >= startTime) {
throw ioe;
}
@@ -337,6 +349,7 @@ public class RPC {
}
}
}
+
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */
public static VersionedProtocol getProxy(
@@ -344,15 +357,34 @@ public class RPC {
long clientVersion, InetSocketAddress addr, Configuration conf,
SocketFactory factory) throws IOException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- return getProxy(protocol, clientVersion, addr, ugi, conf, factory);
+ return getProxy(protocol, clientVersion, addr, ugi, conf, factory, 0);
}
-
+
+ /** Construct a client-side proxy object that implements the named protocol,
+ * talking to a server at the named address. */
+ public static VersionedProtocol getProxy(
+ Class<? extends VersionedProtocol> protocol,
+ long clientVersion, InetSocketAddress addr, Configuration conf,
+ SocketFactory factory, int rpcTimeout) throws IOException {
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ return getProxy(protocol, clientVersion, addr, ugi, conf, factory,
rpcTimeout);
+ }
+
+ /** Construct a client-side proxy object that implements the named protocol,
+ * talking to a server at the named address. */
+ public static VersionedProtocol getProxy(
+ Class<? extends VersionedProtocol> protocol,
+ long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
+ Configuration conf, SocketFactory factory) throws IOException {
+ return getProxy(protocol, clientVersion, addr, ticket, conf, factory, 0);
+ }
+
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */
public static VersionedProtocol getProxy(
Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
- Configuration conf, SocketFactory factory) throws IOException {
+ Configuration conf, SocketFactory factory, int rpcTimeout) throws
IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
@@ -360,7 +392,7 @@ public class RPC {
VersionedProtocol proxy =
(VersionedProtocol) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] { protocol },
- new Invoker(protocol, addr, ticket, conf, factory));
+ new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
long serverVersion = proxy.getProtocolVersion(protocol.getName(),
clientVersion);
if (serverVersion == clientVersion) {
@@ -385,9 +417,17 @@ public class RPC {
Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, Configuration conf)
throws IOException {
+ return getProxy(protocol, clientVersion, addr, conf,
+ NetUtils.getDefaultSocketFactory(conf), 0);
+ }
+
+ public static VersionedProtocol getProxy(
+ Class<? extends VersionedProtocol> protocol,
+ long clientVersion, InetSocketAddress addr, Configuration conf, int
rpcTimeout)
+ throws IOException {
- return getProxy(protocol, clientVersion, addr, conf,
- NetUtils.getDefaultSocketFactory(conf));
+ return getProxy(protocol, clientVersion, addr, conf,
+ NetUtils.getDefaultSocketFactory(conf), rpcTimeout);
}
/**
Modified:
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1153219&r1=1153218&r2=1153219&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
(original)
+++
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
Tue Aug 2 17:46:32 2011
@@ -134,7 +134,7 @@ public class DFSClient implements FSCons
static ClientDatanodeProtocol createClientDatanodeProtocolProxy (
DatanodeID datanodeid, Configuration conf,
- Block block, Token<BlockTokenIdentifier> token) throws IOException {
+ Block block, Token<BlockTokenIdentifier> token, int socketTimeout)
throws IOException {
InetSocketAddress addr = NetUtils.createSocketAddr(
datanodeid.getHost() + ":" + datanodeid.getIpcPort());
if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
@@ -145,7 +145,7 @@ public class DFSClient implements FSCons
ticket.addToken(token);
return (ClientDatanodeProtocol)RPC.getProxy(ClientDatanodeProtocol.class,
ClientDatanodeProtocol.versionID, addr, ticket, conf, NetUtils
- .getDefaultSocketFactory(conf));
+ .getDefaultSocketFactory(conf), socketTimeout);
}
/**
@@ -2741,7 +2741,7 @@ public class DFSClient implements FSCons
try {
// Pick the "least" datanode as the primary datanode to avoid
deadlock.
primaryNode = Collections.min(Arrays.asList(newnodes));
- primary = createClientDatanodeProtocolProxy(primaryNode, conf,
block, accessToken);
+ primary = createClientDatanodeProtocolProxy(primaryNode, conf,
block, accessToken, socketTimeout);
newBlock = primary.recoverBlock(block, isAppend, newnodes);
} catch (IOException e) {
recoveryErrorCount++;
Modified:
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1153219&r1=1153218&r2=1153219&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Tue Aug 2 17:46:32 2011
@@ -537,7 +537,7 @@ public class DataNode extends Configured
}
public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
- DatanodeID datanodeid, final Configuration conf) throws IOException {
+ DatanodeID datanodeid, final Configuration conf, final int
socketTimeout) throws IOException {
final InetSocketAddress addr = NetUtils.createSocketAddr(
datanodeid.getHost() + ":" + datanodeid.getIpcPort());
if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
@@ -551,7 +551,7 @@ public class DataNode extends Configured
public InterDatanodeProtocol run() throws IOException {
return (InterDatanodeProtocol) RPC.getProxy(
InterDatanodeProtocol.class, InterDatanodeProtocol.versionID,
- addr, conf);
+ addr, conf, socketTimeout);
}
});
} catch (InterruptedException ie) {
@@ -1712,7 +1712,7 @@ public class DataNode extends Configured
for(DatanodeID id : datanodeids) {
try {
InterDatanodeProtocol datanode = dnRegistration.equals(id)?
- this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
+ this: DataNode.createInterDataNodeProtocolProxy(id, getConf(),
socketTimeout);
BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
if (info != null && info.getGenerationStamp() >=
block.getGenerationStamp()) {
if (keepLength) {
Modified:
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1153219&r1=1153218&r2=1153219&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
(original)
+++
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
Tue Aug 2 17:46:32 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.DFSClient.
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
import org.apache.hadoop.hdfs.server.common.*;
+import org.apache.hadoop.hdfs.server.datanode.TestInterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.*;
@@ -381,4 +382,43 @@ public class TestDFSClientRetries extend
return new LocatedBlocks(goodBlockList.getFileLength(), badBlocks,
false);
}
}
+
+
+ /**
+ * The following test first creates a file.
+ * It verifies the block information from a datanode.
+ * Then, it stops the DN and observes timeout on connection attempt.
+ */
+ public void testDFSClientTimeout() throws Exception {
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = null;
+
+ try {
+ cluster = new MiniDFSCluster(conf, 3, true, null);
+ cluster.waitActive();
+
+ //create a file
+ DistributedFileSystem dfs =
(DistributedFileSystem)cluster.getFileSystem();
+ String filestr = "/foo";
+ Path filepath = new Path(filestr);
+ DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
+ assertTrue(dfs.getClient().exists(filestr));
+
+ //get block info
+ LocatedBlock locatedblock =
TestInterDatanodeProtocol.getLastLocatedBlock(dfs.getClient().namenode,
filestr);
+ DatanodeInfo[] datanodeinfo = locatedblock.getLocations();
+ assertTrue(datanodeinfo.length > 0);
+
+ //shutdown a data node
+ cluster.stopDataNode(datanodeinfo[0].getName());
+ DFSClient.createClientDatanodeProtocolProxy(datanodeinfo[0], conf,
+ locatedblock.getBlock(), locatedblock.getBlockToken(), 500);
+ fail("Expected an exception to have been thrown");
+ } catch (IOException e) {
+ DFSClient.LOG.info("Got a SocketTimeoutException ", e);
+ } finally {
+ if (cluster != null) {cluster.shutdown();}
+ }
+ }
+
}
Modified:
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java?rev=1153219&r1=1153218&r2=1153219&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java
(original)
+++
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java
Tue Aug 2 17:46:32 2011
@@ -80,10 +80,8 @@ public class TestLeaseRecovery extends j
assertEquals(REPLICATION_NUM, datanodeinfos.length);
//connect to data nodes
- InterDatanodeProtocol[] idps = new
InterDatanodeProtocol[REPLICATION_NUM];
DataNode[] datanodes = new DataNode[REPLICATION_NUM];
for(int i = 0; i < REPLICATION_NUM; i++) {
- idps[i] = DataNode.createInterDataNodeProtocolProxy(datanodeinfos[i],
conf);
datanodes[i] = cluster.getDataNode(datanodeinfos[i].getIpcPort());
assertTrue(datanodes[i] != null);
}
@@ -92,7 +90,7 @@ public class TestLeaseRecovery extends j
Block lastblock = locatedblock.getBlock();
DataNode.LOG.info("newblocks=" + lastblock);
for(int i = 0; i < REPLICATION_NUM; i++) {
- checkMetaInfo(lastblock, idps[i]);
+ checkMetaInfo(lastblock, datanodes[i]);
}
//setup random block sizes
@@ -108,8 +106,8 @@ public class TestLeaseRecovery extends j
for(int i = 0; i < REPLICATION_NUM; i++) {
newblocks[i] = new Block(lastblock.getBlockId(), newblocksizes[i],
lastblock.getGenerationStamp());
- idps[i].updateBlock(lastblock, newblocks[i], false);
- checkMetaInfo(newblocks[i], idps[i]);
+ datanodes[i].updateBlock(lastblock, newblocks[i], false);
+ checkMetaInfo(newblocks[i], datanodes[i]);
}
DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName);
@@ -127,7 +125,7 @@ public class TestLeaseRecovery extends j
long currentGS = cluster.getNameNode().namesystem.getGenerationStamp();
lastblock.setGenerationStamp(currentGS);
for(int i = 0; i < REPLICATION_NUM; i++) {
- updatedmetainfo[i] = idps[i].getBlockMetaDataInfo(lastblock);
+ updatedmetainfo[i] = datanodes[i].getBlockMetaDataInfo(lastblock);
assertEquals(lastblock.getBlockId(), updatedmetainfo[i].getBlockId());
assertEquals(minsize, updatedmetainfo[i].getNumBytes());
assertEquals(currentGS, updatedmetainfo[i].getGenerationStamp());
Modified:
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java?rev=1153219&r1=1153218&r2=1153219&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
(original)
+++
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
Tue Aug 2 17:46:32 2011
@@ -88,10 +88,10 @@ public class TestInterDatanodeProtocol e
assertTrue(datanodeinfo.length > 0);
//connect to a data node
- InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy(
- datanodeinfo[0], conf);
DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
assertTrue(datanode != null);
+ InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy(
+ datanodeinfo[0], conf, datanode.socketTimeout);
//stop block scanner, so we could compare lastScanTime
datanode.blockScannerThread.interrupt();
@@ -111,4 +111,42 @@ public class TestInterDatanodeProtocol e
if (cluster != null) {cluster.shutdown();}
}
}
+
+ /**
+ * The following test first creates a file.
+ * It verifies the block information from a datanode.
+ * Then, it stops the DN and observes timeout on connection attempt.
+ */
+ public void testInterDNProtocolTimeout() throws Exception {
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = null;
+
+ try {
+ cluster = new MiniDFSCluster(conf, 3, true, null);
+ cluster.waitActive();
+
+ //create a file
+ DistributedFileSystem dfs =
(DistributedFileSystem)cluster.getFileSystem();
+ String filestr = "/foo";
+ Path filepath = new Path(filestr);
+ DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
+ assertTrue(dfs.getClient().exists(filestr));
+
+ //get block info
+ LocatedBlock locatedblock =
getLastLocatedBlock(dfs.getClient().namenode, filestr);
+ DatanodeInfo[] datanodeinfo = locatedblock.getLocations();
+ assertTrue(datanodeinfo.length > 0);
+
+ //shutdown a data node
+ cluster.stopDataNode(datanodeinfo[0].getName());
+ InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy(
+ datanodeinfo[0], conf, 500);
+ fail("Expected an exception to have been thrown");
+ } catch (IOException e) {
+ InterDatanodeProtocol.LOG.info("Got a SocketTimeoutException ", e);
+ } finally {
+ if (cluster != null) {cluster.shutdown();}
+ }
+ }
+
}
Modified:
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/ipc/TestIPC.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/ipc/TestIPC.java?rev=1153219&r1=1153218&r2=1153219&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/ipc/TestIPC.java
(original)
+++
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/ipc/TestIPC.java
Tue Aug 2 17:46:32 2011
@@ -29,6 +29,7 @@ import java.util.Random;
import java.io.DataInput;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
import junit.framework.TestCase;
@@ -41,7 +42,8 @@ public class TestIPC extends TestCase {
final private static Configuration conf = new Configuration();
final static private int PING_INTERVAL = 1000;
-
+ final static private int MIN_SLEEP_TIME = 1000;
+
static {
Client.setPingInterval(conf, PING_INTERVAL);
}
@@ -64,8 +66,9 @@ public class TestIPC extends TestCase {
public Writable call(Class<?> protocol, Writable param, long receiveTime)
throws IOException {
if (sleep) {
+ // sleep a bit
try {
- Thread.sleep(RANDOM.nextInt(2*PING_INTERVAL)); // sleep a bit
+ Thread.sleep(RANDOM.nextInt(PING_INTERVAL) + MIN_SLEEP_TIME);
} catch (InterruptedException e) {}
}
return param; // echo param as result
@@ -89,7 +92,7 @@ public class TestIPC extends TestCase {
try {
LongWritable param = new LongWritable(RANDOM.nextLong());
LongWritable value =
- (LongWritable)client.call(param, server, null, null, conf);
+ (LongWritable)client.call(param, server, null, null, 0, conf);
if (!param.equals(value)) {
LOG.fatal("Call failed!");
failed = true;
@@ -139,7 +142,7 @@ public class TestIPC extends TestCase {
}
public void testSerial() throws Exception {
- testSerial(3, false, 2, 5, 100);
+ testSerial(3, false, 2, 5, 10);
}
public void testSerial(int handlerCount, boolean handlerSleep,
@@ -217,7 +220,7 @@ public class TestIPC extends TestCase {
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
try {
client.call(new LongWritable(RANDOM.nextLong()),
- address, null, null, conf);
+ address, null, null, 0, conf);
fail("Expected an exception to have been thrown");
} catch (IOException e) {
String message = e.getMessage();
@@ -232,6 +235,27 @@ public class TestIPC extends TestCase {
}
}
+ public void testIpcTimeout() throws Exception {
+ // start server
+ Server server = new TestServer(1, true);
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ server.start();
+
+ // start client
+ Client client = new Client(LongWritable.class, conf);
+ // set timeout to be less than MIN_SLEEP_TIME
+ try {
+ client.call(new LongWritable(RANDOM.nextLong()),
+ addr, null, null, MIN_SLEEP_TIME/2);
+ fail("Expected an exception to have been thrown");
+ } catch (SocketTimeoutException e) {
+ LOG.info("Get a SocketTimeoutException ", e);
+ }
+ // set timeout to be bigger than 3*ping interval
+ client.call(new LongWritable(RANDOM.nextLong()),
+ addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME);
+ }
+
private static class LongErrorWritable extends LongWritable {
private final static String ERR_MSG =
"Come across an exception while reading";
@@ -258,7 +282,7 @@ public class TestIPC extends TestCase {
Client client = new Client(LongErrorWritable.class, conf);
try {
client.call(new LongErrorWritable(RANDOM.nextLong()),
- addr, null, null, conf);
+ addr, null, null, 0, conf);
fail("Expected an exception to have been thrown");
} catch (IOException e) {
// check error