Author: todd
Date: Tue Jan 17 18:39:09 2012
New Revision: 1232531
URL: http://svn.apache.org/viewvc?rev=1232531&view=rev
Log:
HDFS-2592. Balancer support for HA namenodes. Contributed by Uma Maheswara Rao
G.
Added:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt?rev=1232531&r1=1232530&r2=1232531&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
Tue Jan 17 18:39:09 2012
@@ -113,3 +113,5 @@ HDFS-2772. On transition to active, stan
HDFS-2767. ConfiguredFailoverProxyProvider should support NameNodeProtocol.
(Uma Maheswara Rao G via todd)
HDFS-2795. Standby NN takes a long time to recover from a dead DN starting up.
(todd)
+
+HDFS-2592. Balancer support for HA namenodes. (Uma Maheswara Rao G via todd)
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1232531&r1=1232530&r2=1232531&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
Tue Jan 17 18:39:09 2012
@@ -22,11 +22,9 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.URI;
import java.util.Collection;
import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -34,11 +32,10 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
-import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -46,13 +43,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.io.retry.RetryProxy;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
@@ -83,13 +74,24 @@ class NameNodeConnector {
NameNodeConnector(Collection<InetSocketAddress> haNNs,
Configuration conf) throws IOException {
- InetSocketAddress nn = Lists.newArrayList(haNNs).get(0);
- // TODO(HA): need to deal with connecting to HA NN pair here
- this.namenodeAddress = nn;
- this.namenode = DFSUtil.createNNProxyWithNamenodeProtocol(nn, conf,
- UserGroupInformation.getCurrentUser());
- this.client = DFSUtil.createNamenode(conf);
- this.fs = FileSystem.get(NameNode.getUri(nn), conf);
+ this.namenodeAddress = Lists.newArrayList(haNNs).get(0);
+ URI nameNodeUri = NameNode.getUri(this.namenodeAddress);
+ NamenodeProtocol failoverNamenode = (NamenodeProtocol) HAUtil
+ .createFailoverProxy(conf, nameNodeUri, NamenodeProtocol.class);
+ if (null != failoverNamenode) {
+ this.namenode = failoverNamenode;
+ } else {
+ this.namenode = DFSUtil.createNNProxyWithNamenodeProtocol(
+ this.namenodeAddress, conf, UserGroupInformation.getCurrentUser());
+ }
+ ClientProtocol failOverClient = (ClientProtocol) HAUtil
+ .createFailoverProxy(conf, nameNodeUri, ClientProtocol.class);
+ if (null != failOverClient) {
+ this.client = failOverClient;
+ } else {
+ this.client = DFSUtil.createNamenode(conf);
+ }
+ this.fs = FileSystem.get(nameNodeUri, conf);
final NamespaceInfo namespaceinfo = namenode.versionRequest();
this.blockpoolID = namespaceinfo.getBlockPoolID();
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1232531&r1=1232530&r2=1232531&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
Tue Jan 17 18:39:09 2012
@@ -329,7 +329,7 @@ class NameNodeRpcServer implements Namen
throw new IllegalArgumentException(
"Unexpected not positive size: "+size);
}
-
+ namesystem.checkOperation(OperationCategory.READ);
return namesystem.getBlockManager().getBlocks(datanode, size);
}
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=1232531&r1=1232530&r2=1232531&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
Tue Jan 17 18:39:09 2012
@@ -42,24 +42,23 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
/**
* This class tests if a balancer schedules tasks correctly.
*/
public class TestBalancer extends TestCase {
private static final Log LOG = LogFactory.getLog(
- "org.apache.hadoop.hdfs.TestReplication");
+ "org.apache.hadoop.hdfs.TestBalancer");
- final private static long CAPACITY = 500L;
- final private static String RACK0 = "/rack0";
- final private static String RACK1 = "/rack1";
- final private static String RACK2 = "/rack2";
- final static private String fileName = "/tmp.txt";
- final static private Path filePath = new Path(fileName);
+ final static long CAPACITY = 500L;
+ final static String RACK0 = "/rack0";
+ final static String RACK1 = "/rack1";
+ final static String RACK2 = "/rack2";
+ final private static String fileName = "/tmp.txt";
+ final static Path filePath = new Path(fileName);
private MiniDFSCluster cluster;
ClientProtocol client;
@@ -83,9 +82,10 @@ public class TestBalancer extends TestCa
}
/* create a file with a length of <code>fileLen</code> */
- private void createFile(long fileLen, short replicationFactor)
+ static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen,
+ short replicationFactor, int nnIndex)
throws IOException {
- FileSystem fs = cluster.getFileSystem();
+ FileSystem fs = cluster.getFileSystem(nnIndex);
DFSTestUtil.createFile(fs, filePath, fileLen,
replicationFactor, r.nextLong());
DFSTestUtil.waitReplication(fs, filePath, replicationFactor);
@@ -104,7 +104,7 @@ public class TestBalancer extends TestCa
short replicationFactor = (short)(numNodes-1);
long fileLen = size/replicationFactor;
- createFile(fileLen, replicationFactor);
+ createFile(cluster , filePath, fileLen, replicationFactor, 0);
List<LocatedBlock> locatedBlocks = client.
getBlockLocations(fileName, 0, fileLen).getLocatedBlocks();
@@ -212,7 +212,8 @@ public class TestBalancer extends TestCa
* @throws IOException - if getStats() fails
* @throws TimeoutException
*/
- private void waitForHeartBeat(long expectedUsedSpace, long
expectedTotalSpace)
+ static void waitForHeartBeat(long expectedUsedSpace,
+ long expectedTotalSpace, ClientProtocol client, MiniDFSCluster cluster)
throws IOException, TimeoutException {
long timeout = TIMEOUT;
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
@@ -249,7 +250,8 @@ public class TestBalancer extends TestCa
* @throws IOException
* @throws TimeoutException
*/
- private void waitForBalancer(long totalUsedSpace, long totalCapacity)
+ static void waitForBalancer(long totalUsedSpace, long totalCapacity,
+ ClientProtocol client, MiniDFSCluster cluster)
throws IOException, TimeoutException {
long timeout = TIMEOUT;
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
@@ -312,7 +314,8 @@ public class TestBalancer extends TestCa
// fill up the cluster to be 30% full
long totalUsedSpace = totalCapacity*3/10;
- createFile(totalUsedSpace/numOfDatanodes, (short)numOfDatanodes);
+ createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
+ (short) numOfDatanodes, 0);
// start up an empty node with the same capacity and on the same rack
cluster.startDataNodes(conf, 1, true, null,
new String[]{newRack}, new long[]{newCapacity});
@@ -328,7 +331,7 @@ public class TestBalancer extends TestCa
private void runBalancer(Configuration conf,
long totalUsedSpace, long totalCapacity) throws Exception {
- waitForHeartBeat(totalUsedSpace, totalCapacity);
+ waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
// start rebalancing
Map<String, Map<String, InetSocketAddress>> namenodes =
@@ -336,9 +339,9 @@ public class TestBalancer extends TestCa
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
- waitForHeartBeat(totalUsedSpace, totalCapacity);
+ waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
LOG.info("Rebalancing with default ctor.");
- waitForBalancer(totalUsedSpace, totalCapacity);
+ waitForBalancer(totalUsedSpace, totalCapacity, client, cluster);
}
/** one-node cluster test*/
@@ -403,7 +406,8 @@ public class TestBalancer extends TestCa
// fill up the cluster to be 30% full
long totalUsedSpace = totalCapacity * 3 / 10;
- createFile(totalUsedSpace / numOfDatanodes, (short) numOfDatanodes);
+ createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
+ (short) numOfDatanodes, 0);
// start up an empty node with the same capacity and on the same rack
cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
new long[] { newCapacity });
Added:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java?rev=1232531&view=auto
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
(added)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
Tue Jan 17 18:39:09 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.balancer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+import org.junit.Test;
+
+/**
+ * Test balancer with HA NameNodes
+ */
+public class TestBalancerWithHANameNodes {
+ private MiniDFSCluster cluster;
+ ClientProtocol client;
+
+ static {
+ Balancer.setBlockMoveWaitTime(1000L);
+ }
+
+ /**
+ * Test a cluster with even distribution, then a new empty node is added to
+ * the cluster. Test start a cluster with specified number of nodes, and
fills
+ * it to be 30% full (with a single file replicated identically to all
+ * datanodes); It then adds one new empty node and starts balancing.
+ */
+ @Test(timeout = 60000)
+ public void testBalancerWithHANameNodes() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ TestBalancer.initConf(conf);
+ long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
+ String newNodeRack = TestBalancer.RACK2; // new node's rack
+ // array of racks for original nodes in cluster
+ String[] racks = new String[] { TestBalancer.RACK0, TestBalancer.RACK1 };
+ // array of capacities of original nodes in cluster
+ long[] capacities = new long[] { TestBalancer.CAPACITY,
+ TestBalancer.CAPACITY };
+ assertEquals(capacities.length, racks.length);
+ int numOfDatanodes = capacities.length;
+ NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
+ nn1Conf.setIpcPort(NameNode.DEFAULT_PORT);
+ MiniDFSNNTopology simpleHATopology = new MiniDFSNNTopology()
+ .addNameservice(new MiniDFSNNTopology.NSConf(null).addNN(nn1Conf)
+ .addNN(new MiniDFSNNTopology.NNConf("nn2")));
+ cluster = new MiniDFSCluster.Builder(conf).nnTopology(simpleHATopology)
+ .numDataNodes(capacities.length).racks(racks).simulatedCapacities(
+ capacities).build();
+ try {
+ cluster.waitActive();
+ cluster.transitionToActive(1);
+ Thread.sleep(500);
+ client = DFSUtil.createNamenode(cluster.getNameNode(1)
+ .getNameNodeAddress(), conf);
+ long totalCapacity = TestBalancer.sum(capacities);
+ // fill up the cluster to be 30% full
+ long totalUsedSpace = totalCapacity * 3 / 10;
+ TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace
+ / numOfDatanodes, (short) numOfDatanodes, 1);
+
+ // start up an empty node with the same capacity and on the same rack
+ cluster.startDataNodes(conf, 1, true, null, new String[] { newNodeRack },
+ new long[] { newNodeCapacity });
+
+ HATestUtil.setFailoverConfigurations(cluster, conf, NameNode.getUri(
+ cluster.getNameNode(0).getNameNodeAddress()).getHost());
+ totalCapacity += newNodeCapacity;
+ TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
+ cluster);
+ Map<String, Map<String, InetSocketAddress>> namenodes = DFSUtil
+ .getNNServiceRpcAddresses(conf);
+ final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
+ assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
+ TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
+ cluster);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+}
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java?rev=1232531&r1=1232530&r2=1232531&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
Tue Jan 17 18:39:09 2012
@@ -127,34 +127,36 @@ public abstract class HATestUtil {
super(message);
}
}
-
+
+ /** Gets the filesystem instance by setting the failover configurations */
public static FileSystem configureFailoverFs(MiniDFSCluster cluster,
Configuration conf)
throws IOException, URISyntaxException {
+ conf = new Configuration(conf);
+ String logicalName = getLogicalHostname(cluster);
+ setFailoverConfigurations(cluster, conf, logicalName);
+ FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
+ return fs;
+ }
+
+ /** Sets the required configurations for performing failover */
+ public static void setFailoverConfigurations(MiniDFSCluster cluster,
+ Configuration conf, String logicalName) {
InetSocketAddress nnAddr1 = cluster.getNameNode(0).getNameNodeAddress();
InetSocketAddress nnAddr2 = cluster.getNameNode(1).getNameNodeAddress();
-
String nsId = "nameserviceId1";
-
String nameNodeId1 = "nn1";
String nameNodeId2 = "nn2";
- String logicalName = getLogicalHostname(cluster);
-
- conf = new Configuration(conf);
String address1 = "hdfs://" + nnAddr1.getHostName() + ":" +
nnAddr1.getPort();
String address2 = "hdfs://" + nnAddr2.getHostName() + ":" +
nnAddr2.getPort();
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
nsId, nameNodeId1), address1);
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
nsId, nameNodeId2), address2);
-
conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, nsId);
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY, nsId),
nameNodeId1 + "," + nameNodeId2);
conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + logicalName,
ConfiguredFailoverProxyProvider.class.getName());
-
- FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
- return fs;
}