Author: vinayakumarb Date: Mon Jun 23 05:16:05 2014 New Revision: 1604692 URL: http://svn.apache.org/r1604692 Log: HDFS-6507. Improve DFSAdmin to support HA cluster better. (Contributd by Zesheng Wu)
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdminWithHA.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1604692&r1=1604691&r2=1604692&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Jun 23 05:16:05 2014 @@ -461,6 +461,9 @@ Release 2.5.0 - UNRELEASED HDFS-4667. Capture renamed files/directories in snapshot diff report. (jing9 and Binglin Chang via jing9) + HDFS-6507. Improve DFSAdmin to support HA cluster better. + (Zesheng Wu via vinayakumarb) + OPTIMIZATIONS HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn) Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java?rev=1604692&r1=1604691&r2=1604692&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java Mon Jun 23 05:16:05 2014 @@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.hdfs.NameNodeProxies.ProxyAndInfo; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; @@ -353,18 +354,42 @@ public class HAUtil { */ public static List<ClientProtocol> getProxiesForAllNameNodesInNameservice( Configuration conf, String nsId) throws IOException { + List<ProxyAndInfo<ClientProtocol>> proxies = + getProxiesForAllNameNodesInNameservice(conf, nsId, ClientProtocol.class); + + List<ClientProtocol> namenodes = new ArrayList<ClientProtocol>( + proxies.size()); + for (ProxyAndInfo<ClientProtocol> proxy : proxies) { + namenodes.add(proxy.getProxy()); + } + return namenodes; + } + + /** + * Get an RPC proxy for each NN in an HA nameservice. Used when a given RPC + * call should be made on every NN in an HA nameservice, not just the active. + * + * @param conf configuration + * @param nsId the nameservice to get all of the proxies for. + * @param xface the protocol class. + * @return a list of RPC proxies for each NN in the nameservice. + * @throws IOException in the event of error. + */ + public static <T> List<ProxyAndInfo<T>> getProxiesForAllNameNodesInNameservice( + Configuration conf, String nsId, Class<T> xface) throws IOException { Map<String, InetSocketAddress> nnAddresses = DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null); - List<ClientProtocol> namenodes = new ArrayList<ClientProtocol>(); + List<ProxyAndInfo<T>> proxies = new ArrayList<ProxyAndInfo<T>>( + nnAddresses.size()); for (InetSocketAddress nnAddress : nnAddresses.values()) { - NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null; + NameNodeProxies.ProxyAndInfo<T> proxyInfo = null; proxyInfo = NameNodeProxies.createNonHAProxy(conf, - nnAddress, ClientProtocol.class, + nnAddress, xface, UserGroupInformation.getCurrentUser(), false); - namenodes.add(proxyInfo.getProxy()); + proxies.add(proxyInfo); } - return namenodes; + return proxies; } /** Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1604692&r1=1604691&r2=1604692&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Mon Jun 23 05:16:05 2014 @@ -106,10 +106,13 @@ public class NameNodeProxies { public static class ProxyAndInfo<PROXYTYPE> { private final PROXYTYPE proxy; private final Text dtService; + private final InetSocketAddress address; - public ProxyAndInfo(PROXYTYPE proxy, Text dtService) { + public ProxyAndInfo(PROXYTYPE proxy, Text dtService, + InetSocketAddress address) { this.proxy = proxy; this.dtService = dtService; + this.address = address; } public PROXYTYPE getProxy() { @@ -119,6 +122,10 @@ public class NameNodeProxies { public Text getDelegationTokenService() { return dtService; } + + public InetSocketAddress getAddress() { + return address; + } } /** @@ -161,7 +168,8 @@ public class NameNodeProxies { dtService = SecurityUtil.buildTokenService( NameNode.getAddress(nameNodeUri)); } - return new ProxyAndInfo<T>(proxy, dtService); + return new ProxyAndInfo<T>(proxy, dtService, + NameNode.getAddress(nameNodeUri)); } } @@ -221,7 +229,8 @@ public class NameNodeProxies { dtService = SecurityUtil.buildTokenService( NameNode.getAddress(nameNodeUri)); } - return new ProxyAndInfo<T>(proxy, dtService); + return new ProxyAndInfo<T>(proxy, dtService, + NameNode.getAddress(nameNodeUri)); } else { LOG.warn("Currently creating proxy using " + "LossyRetryInvocationHandler requires NN HA setup"); @@ -274,7 +283,7 @@ public class NameNodeProxies { throw new IllegalStateException(message); } - return new ProxyAndInfo<T>(proxy, dtService); + return new ProxyAndInfo<T>(proxy, dtService, nnAddr); } private static JournalProtocol createNNProxyWithJournalProtocol( Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1604692&r1=1604691&r2=1604692&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Mon Jun 23 05:16:05 2014 @@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.Distribute import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.hdfs.NameNodeProxies.ProxyAndInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -498,25 +499,60 @@ public class DFSAdmin extends FsShell { printUsage("-safemode"); return; } + DistributedFileSystem dfs = getDFS(); - boolean inSafeMode = dfs.setSafeMode(action); + Configuration dfsConf = dfs.getConf(); + URI dfsUri = dfs.getUri(); + boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri); - // - // If we are waiting for safemode to exit, then poll and - // sleep till we are out of safemode. - // - if (waitExitSafe) { - while (inSafeMode) { - try { - Thread.sleep(5000); - } catch (java.lang.InterruptedException e) { - throw new IOException("Wait Interrupted"); + if (isHaEnabled) { + String nsId = dfsUri.getHost(); + List<ProxyAndInfo<ClientProtocol>> proxies = + HAUtil.getProxiesForAllNameNodesInNameservice( + dfsConf, nsId, ClientProtocol.class); + for (ProxyAndInfo<ClientProtocol> proxy : proxies) { + ClientProtocol haNn = proxy.getProxy(); + boolean inSafeMode = haNn.setSafeMode(action, false); + if (waitExitSafe) { + inSafeMode = waitExitSafeMode(haNn, inSafeMode); } - inSafeMode = dfs.setSafeMode(SafeModeAction.SAFEMODE_GET); + System.out.println("Safe mode is " + (inSafeMode ? "ON" : "OFF") + + " in " + proxy.getAddress()); + } + } else { + boolean inSafeMode = dfs.setSafeMode(action); + if (waitExitSafe) { + inSafeMode = waitExitSafeMode(dfs, inSafeMode); + } + System.out.println("Safe mode is " + (inSafeMode ? "ON" : "OFF")); + } + + } + + private boolean waitExitSafeMode(DistributedFileSystem dfs, boolean inSafeMode) + throws IOException { + while (inSafeMode) { + try { + Thread.sleep(5000); + } catch (java.lang.InterruptedException e) { + throw new IOException("Wait Interrupted"); } + inSafeMode = dfs.setSafeMode(SafeModeAction.SAFEMODE_GET, false); } + return inSafeMode; + } - System.out.println("Safe mode is " + (inSafeMode ? "ON" : "OFF")); + private boolean waitExitSafeMode(ClientProtocol nn, boolean inSafeMode) + throws IOException { + while (inSafeMode) { + try { + Thread.sleep(5000); + } catch (java.lang.InterruptedException e) { + throw new IOException("Wait Interrupted"); + } + inSafeMode = nn.setSafeMode(SafeModeAction.SAFEMODE_GET, false); + } + return inSafeMode; } /** @@ -561,7 +597,24 @@ public class DFSAdmin extends FsShell { int exitCode = -1; DistributedFileSystem dfs = getDFS(); - dfs.saveNamespace(); + Configuration dfsConf = dfs.getConf(); + URI dfsUri = dfs.getUri(); + boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri); + + if (isHaEnabled) { + String nsId = dfsUri.getHost(); + List<ProxyAndInfo<ClientProtocol>> proxies = + HAUtil.getProxiesForAllNameNodesInNameservice(dfsConf, + nsId, ClientProtocol.class); + for (ProxyAndInfo<ClientProtocol> proxy : proxies) { + proxy.getProxy().saveNamespace(); + System.out.println("Save namespace successful for " + + proxy.getAddress()); + } + } else { + dfs.saveNamespace(); + System.out.println("Save namespace successful"); + } exitCode = 0; return exitCode; @@ -583,15 +636,30 @@ public class DFSAdmin extends FsShell { */ public int restoreFailedStorage(String arg) throws IOException { int exitCode = -1; - if(!arg.equals("check") && !arg.equals("true") && !arg.equals("false")) { System.err.println("restoreFailedStorage valid args are true|false|check"); return exitCode; } DistributedFileSystem dfs = getDFS(); - Boolean res = dfs.restoreFailedStorage(arg); - System.out.println("restoreFailedStorage is set to " + res); + Configuration dfsConf = dfs.getConf(); + URI dfsUri = dfs.getUri(); + boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri); + + if (isHaEnabled) { + String nsId = dfsUri.getHost(); + List<ProxyAndInfo<ClientProtocol>> proxies = + HAUtil.getProxiesForAllNameNodesInNameservice(dfsConf, + nsId, ClientProtocol.class); + for (ProxyAndInfo<ClientProtocol> proxy : proxies) { + Boolean res = proxy.getProxy().restoreFailedStorage(arg); + System.out.println("restoreFailedStorage is set to " + res + " for " + + proxy.getAddress()); + } + } else { + Boolean res = dfs.restoreFailedStorage(arg); + System.out.println("restoreFailedStorage is set to " + res); + } exitCode = 0; return exitCode; @@ -607,7 +675,24 @@ public class DFSAdmin extends FsShell { int exitCode = -1; DistributedFileSystem dfs = getDFS(); - dfs.refreshNodes(); + Configuration dfsConf = dfs.getConf(); + URI dfsUri = dfs.getUri(); + boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri); + + if (isHaEnabled) { + String nsId = dfsUri.getHost(); + List<ProxyAndInfo<ClientProtocol>> proxies = + HAUtil.getProxiesForAllNameNodesInNameservice(dfsConf, + nsId, ClientProtocol.class); + for (ProxyAndInfo<ClientProtocol> proxy: proxies) { + proxy.getProxy().refreshNodes(); + System.out.println("Refresh nodes successful for " + + proxy.getAddress()); + } + } else { + dfs.refreshNodes(); + System.out.println("Refresh nodes successful"); + } exitCode = 0; return exitCode; @@ -641,7 +726,24 @@ public class DFSAdmin extends FsShell { } DistributedFileSystem dfs = (DistributedFileSystem) fs; - dfs.setBalancerBandwidth(bandwidth); + Configuration dfsConf = dfs.getConf(); + URI dfsUri = dfs.getUri(); + boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri); + + if (isHaEnabled) { + String nsId = dfsUri.getHost(); + List<ProxyAndInfo<ClientProtocol>> proxies = + HAUtil.getProxiesForAllNameNodesInNameservice(dfsConf, + nsId, ClientProtocol.class); + for (ProxyAndInfo<ClientProtocol> proxy : proxies) { + proxy.getProxy().setBalancerBandwidth(bandwidth); + System.out.println("Balancer bandwidth is set to " + bandwidth + + " for " + proxy.getAddress()); + } + } else { + dfs.setBalancerBandwidth(bandwidth); + System.out.println("Balancer bandwidth is set to " + bandwidth); + } exitCode = 0; return exitCode; @@ -937,11 +1039,18 @@ public class DFSAdmin extends FsShell { if (!HAUtil.isAtLeastOneActive(namenodes)) { throw new IOException("Cannot finalize with no NameNode active"); } - for (ClientProtocol haNn : namenodes) { - haNn.finalizeUpgrade(); + + List<ProxyAndInfo<ClientProtocol>> proxies = + HAUtil.getProxiesForAllNameNodesInNameservice(dfsConf, + nsId, ClientProtocol.class); + for (ProxyAndInfo<ClientProtocol> proxy : proxies) { + proxy.getProxy().finalizeUpgrade(); + System.out.println("Finalize upgrade successful for " + + proxy.getAddress()); } } else { dfs.finalizeUpgrade(); + System.out.println("Finalize upgrade successful"); } return 0; @@ -958,9 +1067,25 @@ public class DFSAdmin extends FsShell { public int metaSave(String[] argv, int idx) throws IOException { String pathname = argv[idx]; DistributedFileSystem dfs = getDFS(); - dfs.metaSave(pathname); - System.out.println("Created metasave file " + pathname + " in the log " + - "directory of namenode " + dfs.getUri()); + Configuration dfsConf = dfs.getConf(); + URI dfsUri = dfs.getUri(); + boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri); + + if (isHaEnabled) { + String nsId = dfsUri.getHost(); + List<ProxyAndInfo<ClientProtocol>> proxies = + HAUtil.getProxiesForAllNameNodesInNameservice(dfsConf, + nsId, ClientProtocol.class); + for (ProxyAndInfo<ClientProtocol> proxy : proxies) { + proxy.getProxy().metaSave(pathname); + System.out.println("Created metasave file " + pathname + " in the log " + + "directory of namenode " + proxy.getAddress()); + } + } else { + dfs.metaSave(pathname); + System.out.println("Created metasave file " + pathname + " in the log " + + "directory of namenode " + dfs.getUri()); + } return 0; } @@ -1022,20 +1147,37 @@ public class DFSAdmin extends FsShell { public int refreshServiceAcl() throws IOException { // Get the current configuration Configuration conf = getConf(); - + // for security authorization // server principal for this call // should be NN's one. conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY, conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, "")); - // Create the client - RefreshAuthorizationPolicyProtocol refreshProtocol = - NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), - RefreshAuthorizationPolicyProtocol.class).getProxy(); - - // Refresh the authorization policy in-effect - refreshProtocol.refreshServiceAcl(); + DistributedFileSystem dfs = getDFS(); + URI dfsUri = dfs.getUri(); + boolean isHaEnabled = HAUtil.isLogicalUri(conf, dfsUri); + + if (isHaEnabled) { + // Run refreshServiceAcl for all NNs if HA is enabled + String nsId = dfsUri.getHost(); + List<ProxyAndInfo<RefreshAuthorizationPolicyProtocol>> proxies = + HAUtil.getProxiesForAllNameNodesInNameservice(conf, nsId, + RefreshAuthorizationPolicyProtocol.class); + for (ProxyAndInfo<RefreshAuthorizationPolicyProtocol> proxy : proxies) { + proxy.getProxy().refreshServiceAcl(); + System.out.println("Refresh service acl successful for " + + proxy.getAddress()); + } + } else { + // Create the client + RefreshAuthorizationPolicyProtocol refreshProtocol = + NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), + RefreshAuthorizationPolicyProtocol.class).getProxy(); + // Refresh the authorization policy in-effect + refreshProtocol.refreshServiceAcl(); + System.out.println("Refresh service acl successful"); + } return 0; } @@ -1054,14 +1196,32 @@ public class DFSAdmin extends FsShell { // should be NN's one. conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY, conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, "")); - - // Create the client - RefreshUserMappingsProtocol refreshProtocol = - NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), - RefreshUserMappingsProtocol.class).getProxy(); - // Refresh the user-to-groups mappings - refreshProtocol.refreshUserToGroupsMappings(); + DistributedFileSystem dfs = getDFS(); + URI dfsUri = dfs.getUri(); + boolean isHaEnabled = HAUtil.isLogicalUri(conf, dfsUri); + + if (isHaEnabled) { + // Run refreshUserToGroupsMapings for all NNs if HA is enabled + String nsId = dfsUri.getHost(); + List<ProxyAndInfo<RefreshUserMappingsProtocol>> proxies = + HAUtil.getProxiesForAllNameNodesInNameservice(conf, nsId, + RefreshUserMappingsProtocol.class); + for (ProxyAndInfo<RefreshUserMappingsProtocol> proxy : proxies) { + proxy.getProxy().refreshUserToGroupsMappings(); + System.out.println("Refresh user to groups mapping successful for " + + proxy.getAddress()); + } + } else { + // Create the client + RefreshUserMappingsProtocol refreshProtocol = + NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), + RefreshUserMappingsProtocol.class).getProxy(); + + // Refresh the user-to-groups mappings + refreshProtocol.refreshUserToGroupsMappings(); + System.out.println("Refresh user to groups mapping successful"); + } return 0; } @@ -1082,13 +1242,31 @@ public class DFSAdmin extends FsShell { conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY, conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, "")); - // Create the client - RefreshUserMappingsProtocol refreshProtocol = - NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), - RefreshUserMappingsProtocol.class).getProxy(); + DistributedFileSystem dfs = getDFS(); + URI dfsUri = dfs.getUri(); + boolean isHaEnabled = HAUtil.isLogicalUri(conf, dfsUri); - // Refresh the user-to-groups mappings - refreshProtocol.refreshSuperUserGroupsConfiguration(); + if (isHaEnabled) { + // Run refreshSuperUserGroupsConfiguration for all NNs if HA is enabled + String nsId = dfsUri.getHost(); + List<ProxyAndInfo<RefreshUserMappingsProtocol>> proxies = + HAUtil.getProxiesForAllNameNodesInNameservice(conf, nsId, + RefreshUserMappingsProtocol.class); + for (ProxyAndInfo<RefreshUserMappingsProtocol> proxy : proxies) { + proxy.getProxy().refreshSuperUserGroupsConfiguration(); + System.out.println("Refresh super user groups configuration " + + "successful for " + proxy.getAddress()); + } + } else { + // Create the client + RefreshUserMappingsProtocol refreshProtocol = + NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), + RefreshUserMappingsProtocol.class).getProxy(); + + // Refresh the user-to-groups mappings + refreshProtocol.refreshSuperUserGroupsConfiguration(); + System.out.println("Refresh super user groups configuration successful"); + } return 0; } @@ -1102,15 +1280,33 @@ public class DFSAdmin extends FsShell { // should be NN's one. conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY, conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, "")); - - // Create the client - RefreshCallQueueProtocol refreshProtocol = - NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), - RefreshCallQueueProtocol.class).getProxy(); - // Refresh the call queue - refreshProtocol.refreshCallQueue(); - + DistributedFileSystem dfs = getDFS(); + URI dfsUri = dfs.getUri(); + boolean isHaEnabled = HAUtil.isLogicalUri(conf, dfsUri); + + if (isHaEnabled) { + // Run refreshCallQueue for all NNs if HA is enabled + String nsId = dfsUri.getHost(); + List<ProxyAndInfo<RefreshCallQueueProtocol>> proxies = + HAUtil.getProxiesForAllNameNodesInNameservice(conf, nsId, + RefreshCallQueueProtocol.class); + for (ProxyAndInfo<RefreshCallQueueProtocol> proxy : proxies) { + proxy.getProxy().refreshCallQueue(); + System.out.println("Refresh call queue successful for " + + proxy.getAddress()); + } + } else { + // Create the client + RefreshCallQueueProtocol refreshProtocol = + NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), + RefreshCallQueueProtocol.class).getProxy(); + + // Refresh the call queue + refreshProtocol.refreshCallQueue(); + System.out.println("Refresh call queue successful"); + } + return 0; } Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdminWithHA.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdminWithHA.java?rev=1604692&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdminWithHA.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdminWithHA.java Mon Jun 23 05:16:05 2014 @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.tools; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; + +import com.google.common.base.Charsets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HAUtil; +import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; +import org.junit.After; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestDFSAdminWithHA { + + private final ByteArrayOutputStream out = new ByteArrayOutputStream(); + private final ByteArrayOutputStream err = new ByteArrayOutputStream(); + private MiniQJMHACluster cluster; + private Configuration conf; + private DFSAdmin admin; + private PrintStream originOut; + private PrintStream originErr; + + private static final String NSID = "ns1"; + + private void assertOutputMatches(String string) { + String errOutput = new String(out.toByteArray(), Charsets.UTF_8); + String output = new String(out.toByteArray(), Charsets.UTF_8); + + if (!errOutput.matches(string) && !output.matches(string)) { + fail("Expected output to match '" + string + + "' but err_output was:\n" + errOutput + + "\n and output was: \n" + output); + } + + out.reset(); + err.reset(); + } + + private void setHAConf(Configuration conf, String nn1Addr, String nn2Addr) { + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, + "hdfs://" + NSID); + conf.set(DFSConfigKeys.DFS_NAMESERVICES, NSID); + conf.set(DFSConfigKeys.DFS_NAMESERVICE_ID, NSID); + conf.set(DFSUtil.addKeySuffixes( + DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX, NSID), "nn1,nn2"); + conf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, "nn1"); + conf.set(DFSUtil.addKeySuffixes( + DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, NSID, "nn1"), nn1Addr); + conf.set(DFSUtil.addKeySuffixes( + DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, NSID, "nn2"), nn2Addr); + } + + private void setUpHaCluster(boolean security) throws Exception { + conf = new Configuration(); + conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, + security); + cluster = new MiniQJMHACluster.Builder(conf).build(); + setHAConf(conf, cluster.getDfsCluster().getNameNode(0).getHostAndPort(), + cluster.getDfsCluster().getNameNode(1).getHostAndPort()); + cluster.getDfsCluster().getNameNode(0).getHostAndPort(); + admin = new DFSAdmin(); + admin.setConf(conf); + assertTrue(HAUtil.isHAEnabled(conf, "ns1")); + + originOut = System.out; + originErr = System.err; + System.setOut(new PrintStream(out)); + System.setErr(new PrintStream(err)); + } + + @After + public void tearDown() throws Exception { + System.out.flush(); + System.err.flush(); + System.setOut(originOut); + System.setErr(originErr); + } + + @Test(timeout = 30000) + public void testSetSafeMode() throws Exception { + setUpHaCluster(false); + // Enter safemode + int exitCode = admin.run(new String[] {"-safemode", "enter"}); + assertEquals(err.toString().trim(), 0, exitCode); + String message = "Safe mode is ON in.*"; + assertOutputMatches(message + "\n" + message + "\n"); + + // Get safemode + exitCode = admin.run(new String[] {"-safemode", "get"}); + assertEquals(err.toString().trim(), 0, exitCode); + message = "Safe mode is ON in.*"; + assertOutputMatches(message + "\n" + message + "\n"); + + // Leave safemode + exitCode = admin.run(new String[] {"-safemode", "leave"}); + assertEquals(err.toString().trim(), 0, exitCode); + message = "Safe mode is OFF in.*"; + assertOutputMatches(message + "\n" + message + "\n"); + + // Get safemode + exitCode = admin.run(new String[] {"-safemode", "get"}); + assertEquals(err.toString().trim(), 0, exitCode); + message = "Safe mode is OFF in.*"; + assertOutputMatches(message + "\n" + message + "\n"); + } + + @Test (timeout = 30000) + public void testSaveNamespace() throws Exception { + setUpHaCluster(false); + // Safe mode should be turned ON in order to create namespace image. + int exitCode = admin.run(new String[] {"-safemode", "enter"}); + assertEquals(err.toString().trim(), 0, exitCode); + String message = "Safe mode is ON in.*"; + assertOutputMatches(message + "\n" + message + "\n"); + + exitCode = admin.run(new String[] {"-saveNamespace"}); + assertEquals(err.toString().trim(), 0, exitCode); + message = "Save namespace successful for.*"; + assertOutputMatches(message + "\n" + message + "\n"); + } + + @Test (timeout = 30000) + public void testRestoreFailedStorage() throws Exception { + setUpHaCluster(false); + int exitCode = admin.run(new String[] {"-restoreFailedStorage", "check"}); + assertEquals(err.toString().trim(), 0, exitCode); + String message = "restoreFailedStorage is set to false for.*"; + // Default is false + assertOutputMatches(message + "\n" + message + "\n"); + + exitCode = admin.run(new String[] {"-restoreFailedStorage", "true"}); + assertEquals(err.toString().trim(), 0, exitCode); + message = "restoreFailedStorage is set to true for.*"; + assertOutputMatches(message + "\n" + message + "\n"); + + exitCode = admin.run(new String[] {"-restoreFailedStorage", "false"}); + assertEquals(err.toString().trim(), 0, exitCode); + message = "restoreFailedStorage is set to false for.*"; + assertOutputMatches(message + "\n" + message + "\n"); + } + + @Test (timeout = 30000) + public void testRefreshNodes() throws Exception { + setUpHaCluster(false); + int exitCode = admin.run(new String[] {"-refreshNodes"}); + assertEquals(err.toString().trim(), 0, exitCode); + String message = "Refresh nodes successful for.*"; + assertOutputMatches(message + "\n" + message + "\n"); + } + + @Test (timeout = 30000) + public void testSetBalancerBandwidth() throws Exception { + setUpHaCluster(false); + int exitCode = admin.run(new String[] {"-setBalancerBandwidth", "10"}); + assertEquals(err.toString().trim(), 0, exitCode); + String message = "Balancer bandwidth is set to 10 for.*"; + assertOutputMatches(message + "\n" + message + "\n"); + } + + @Test (timeout = 30000) + public void testMetaSave() throws Exception { + setUpHaCluster(false); + int exitCode = admin.run(new String[] {"-metasave", "dfs.meta"}); + assertEquals(err.toString().trim(), 0, exitCode); + String message = "Created metasave file dfs.meta in the log directory" + + " of namenode.*"; + assertOutputMatches(message + "\n" + message + "\n"); + } + + @Test (timeout = 30000) + public void testRefreshServiceAcl() throws Exception { + setUpHaCluster(true); + int exitCode = admin.run(new String[] {"-refreshServiceAcl"}); + assertEquals(err.toString().trim(), 0, exitCode); + String message = "Refresh service acl successful for.*"; + assertOutputMatches(message + "\n" + message + "\n"); + } + + @Test (timeout = 30000) + public void testRefreshUserToGroupsMappings() throws Exception { + setUpHaCluster(false); + int exitCode = admin.run(new String[] {"-refreshUserToGroupsMappings"}); + assertEquals(err.toString().trim(), 0, exitCode); + String message = "Refresh user to groups mapping successful for.*"; + assertOutputMatches(message + "\n" + message + "\n"); + } + + @Test (timeout = 30000) + public void testRefreshSuperUserGroupsConfiguration() throws Exception { + setUpHaCluster(false); + int exitCode = admin.run( + new String[] {"-refreshSuperUserGroupsConfiguration"}); + assertEquals(err.toString().trim(), 0, exitCode); + String message = "Refresh super user groups configuration successful for.*"; + assertOutputMatches(message + "\n" + message + "\n"); + } + + @Test (timeout = 30000) + public void testRefreshCallQueue() throws Exception { + setUpHaCluster(false); + int exitCode = admin.run(new String[] {"-refreshCallQueue"}); + assertEquals(err.toString().trim(), 0, exitCode); + String message = "Refresh call queue successful for.*"; + assertOutputMatches(message + "\n" + message + "\n"); + } +} Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml?rev=1604692&r1=1604691&r2=1604692&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml Mon Jun 23 05:16:05 2014 @@ -15714,8 +15714,8 @@ </cleanup-commands> <comparators> <comparator> - <type>ExactComparator</type> - <expected-output></expected-output> + <type>RegexpComparator</type> + <expected-output>Refresh service acl successful(\n)*</expected-output> </comparator> </comparators> </test><!-- @@ -15951,8 +15951,8 @@ </cleanup-commands> <comparators> <comparator> - <type>ExactComparator</type> - <expected-output></expected-output> + <type>RegexpComparator</type> + <expected-output>Save namespace successful(\n)*</expected-output> </comparator> </comparators> </test> @@ -16367,8 +16367,8 @@ </cleanup-commands> <comparators> <comparator> - <type>ExactComparator</type> - <expected-output></expected-output> + <type>RegexpComparator</type> + <expected-output>Refresh user to groups mapping successful(\n)*</expected-output> </comparator> </comparators> </test> @@ -16383,8 +16383,8 @@ </cleanup-commands> <comparators> <comparator> - <type>ExactComparator</type> - <expected-output></expected-output> + <type>RegexpComparator</type> + <expected-output>Refresh super user groups configuration successful(\n)*</expected-output> </comparator> </comparators> </test> @@ -16453,8 +16453,8 @@ </cleanup-commands> <comparators> <comparator> - <type>ExactComparator</type> - <expected-output></expected-output> + <type>RegexpComparator</type> + <expected-output>Balancer bandwidth is set to 104857600(\n)*</expected-output> </comparator> </comparators> </test> @@ -16469,8 +16469,8 @@ </cleanup-commands> <comparators> <comparator> - <type>ExactComparator</type> - <expected-output></expected-output> + <type>SubstringComparator</type> + <expected-output>Finalize upgrade successful</expected-output> </comparator> </comparators> </test>