Author: jing9 Date: Fri Sep 6 17:17:30 2013 New Revision: 1520637 URL: http://svn.apache.org/r1520637 Log: HDS-5118. Provide testing support for DFSClient to drop RPC responses. Contributed by Jing Zhao.
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1520637&r1=1520636&r2=1520637&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Sep 6 17:17:30 2013 @@ -244,6 +244,9 @@ Release 2.3.0 - UNRELEASED NEW FEATURES + HDFS-5118. Provide testing support for DFSClient to drop RPC responses. + (jing9) + IMPROVEMENTS HDFS-4657. Limit the number of blocks logged by the NN after a block Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1520637&r1=1520636&r2=1520637&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Sep 6 17:17:30 2013 @@ -27,6 +27,9 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT; @@ -44,9 +47,6 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; @@ -100,6 +100,7 @@ import org.apache.hadoop.fs.MD5MD5CRC32G import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.VolumeId; import org.apache.hadoop.fs.permission.FsPermission; @@ -113,13 +114,13 @@ import org.apache.hadoop.hdfs.protocol.D import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; @@ -144,6 +145,7 @@ import org.apache.hadoop.io.EnumSetWrita import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; @@ -451,7 +453,11 @@ public class DFSClient implements java.i /** * Create a new DFSClient connected to the given nameNodeUri or rpcNamenode. - * Exactly one of nameNodeUri or rpcNamenode must be null. + * If HA is enabled and a positive value is set for + * {@link DFSConfigKeys#DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY} in the + * configuration, the DFSClient will use {@link LossyRetryInvocationHandler} + * as its RetryInvocationHandler. Otherwise one of nameNodeUri or rpcNamenode + * must be null. */ @VisibleForTesting public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, @@ -475,7 +481,20 @@ public class DFSClient implements java.i this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" + DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId(); - if (rpcNamenode != null) { + int numResponseToDrop = conf.getInt( + DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY, + DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT); + if (numResponseToDrop > 0) { + // This case is used for testing. + LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY + + " is set to " + numResponseToDrop + + ", this hacked client will proactively drop responses"); + NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxies + .createProxyWithLossyRetryHandler(conf, nameNodeUri, + ClientProtocol.class, numResponseToDrop); + this.dtService = proxyInfo.getDelegationTokenService(); + this.namenode = proxyInfo.getProxy(); + } else if (rpcNamenode != null) { // This case is used for testing. Preconditions.checkArgument(nameNodeUri == null); this.namenode = rpcNamenode; @@ -514,7 +533,7 @@ public class DFSClient implements java.i this.defaultWriteCachingStrategy = new CachingStrategy(writeDropBehind, readahead); } - + /** * Return the socket addresses to use with each configured * local interface. Local interfaces may be specified by IP Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1520637&r1=1520636&r2=1520637&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Sep 6 17:17:30 2013 @@ -497,6 +497,12 @@ public class DFSConfigKeys extends Commo public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; // 10 minutes public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = "dfs.namenode.retrycache.heap.percent"; public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f; + + // The number of NN response dropped by client proactively in each RPC call. + // For testing NN retry cache, we can set this property with positive value. + public static final String DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY = "dfs.client.test.drop.namenode.response.number"; + public static final int DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT = 0; + // Hidden configuration undocumented in hdfs-site. xml // Timeout to wait for block receiver and responder thread to stop Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1520637&r1=1520636&r2=1520637&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Fri Sep 6 17:17:30 2013 @@ -17,10 +17,18 @@ */ package org.apache.hadoop.hdfs; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY; import java.io.IOException; import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.URI; import java.util.HashMap; @@ -48,6 +56,7 @@ import org.apache.hadoop.hdfs.server.pro import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider; import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; @@ -144,6 +153,61 @@ public class NameNodeProxies { return new ProxyAndInfo<T>(proxy, dtService); } } + + /** + * Generate a dummy namenode proxy instance that utilizes our hacked + * {@link LossyRetryInvocationHandler}. Proxy instance generated using this + * method will proactively drop RPC responses. Currently this method only + * support HA setup. IllegalStateException will be thrown if the given + * configuration is not for HA. + * + * @param config the configuration containing the required IPC + * properties, client failover configurations, etc. + * @param nameNodeUri the URI pointing either to a specific NameNode + * or to a logical nameservice. + * @param xface the IPC interface which should be created + * @param numResponseToDrop The number of responses to drop for each RPC call + * @return an object containing both the proxy and the associated + * delegation token service it corresponds to + * @throws IOException if there is an error creating the proxy + */ + @SuppressWarnings("unchecked") + public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler( + Configuration config, URI nameNodeUri, Class<T> xface, + int numResponseToDrop) throws IOException { + Preconditions.checkArgument(numResponseToDrop > 0); + Class<FailoverProxyProvider<T>> failoverProxyProviderClass = + getFailoverProxyProviderClass(config, nameNodeUri, xface); + if (failoverProxyProviderClass != null) { // HA case + FailoverProxyProvider<T> failoverProxyProvider = + createFailoverProxyProvider(config, failoverProxyProviderClass, + xface, nameNodeUri); + int delay = config.getInt( + DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY, + DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT); + int maxCap = config.getInt( + DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY, + DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT); + int maxFailoverAttempts = config.getInt( + DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, + DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT); + InvocationHandler dummyHandler = new LossyRetryInvocationHandler<T>( + numResponseToDrop, failoverProxyProvider, + RetryPolicies.failoverOnNetworkException( + RetryPolicies.TRY_ONCE_THEN_FAIL, + Math.max(numResponseToDrop + 1, maxFailoverAttempts), delay, + maxCap)); + + T proxy = (T) Proxy.newProxyInstance( + failoverProxyProvider.getInterface().getClassLoader(), + new Class[] { xface }, dummyHandler); + Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri); + return new ProxyAndInfo<T>(proxy, dtService); + } else { + throw new IllegalStateException("Currently creating proxy using " + + "LossyRetryInvocationHandler requires NN HA setup"); + } + } /** * Creates an explicitly non-HA-enabled proxy object. Most of the time you