Author: szetszwo
Date: Wed Dec 14 01:47:57 2011
New Revision: 1214027
URL: http://svn.apache.org/viewvc?rev=1214027&view=rev
Log:
HDFS-2545. Change WebHDFS to support multiple namenodes in federation.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/InetSocketAddressParam.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeRpcAddressParam.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsWithMultipleNameNodes.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1214027&r1=1214026&r2=1214027&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Dec 14
01:47:57 2011
@@ -175,6 +175,9 @@ Release 0.23.1 - UNRELEASED
HDFS-2594. Support getDelegationTokens and createSymlink in WebHDFS.
(szetszwo)
+ HDFS-2545. Change WebHDFS to support multiple namenodes in federation.
+ (szetszwo)
+
IMPROVEMENTS
HDFS-2560. Refactor BPOfferService to be a static inner class (todd)
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1214027&r1=1214026&r2=1214027&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
Wed Dec 14 01:47:57 2011
@@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.web.resour
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.LengthParam;
+import org.apache.hadoop.hdfs.web.resources.NamenodeRpcAddressParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
import org.apache.hadoop.hdfs.web.resources.Param;
@@ -89,7 +90,8 @@ public class DatanodeWebHdfsMethods {
private @Context ServletContext context;
private @Context HttpServletResponse response;
- private void init(final UserGroupInformation ugi, final DelegationParam
delegation,
+ private void init(final UserGroupInformation ugi,
+ final DelegationParam delegation, final InetSocketAddress nnRpcAddr,
final UriFsPathParam path, final HttpOpParam<?> op,
final Param<?, ?>... parameters) throws IOException {
if (LOG.isTraceEnabled()) {
@@ -102,9 +104,8 @@ public class DatanodeWebHdfsMethods {
if (UserGroupInformation.isSecurityEnabled()) {
//add a token for RPC.
- final DataNode datanode = (DataNode)context.getAttribute("datanode");
- final InetSocketAddress nnRpcAddr =
NameNode.getAddress(datanode.getConf());
- final Token<DelegationTokenIdentifier> token = new
Token<DelegationTokenIdentifier>();
+ final Token<DelegationTokenIdentifier> token =
+ new Token<DelegationTokenIdentifier>();
token.decodeFromUrlString(delegation.getValue());
SecurityUtil.setTokenService(token, nnRpcAddr);
token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
@@ -122,6 +123,9 @@ public class DatanodeWebHdfsMethods {
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
+ @QueryParam(NamenodeRpcAddressParam.NAME)
+ @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
+ final NamenodeRpcAddressParam namenodeRpcAddress,
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
final PutOpParam op,
@QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT)
@@ -135,8 +139,8 @@ public class DatanodeWebHdfsMethods {
@QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
final BlockSizeParam blockSize
) throws IOException, InterruptedException {
- return put(in, ugi, delegation, ROOT, op, permission, overwrite,
bufferSize,
- replication, blockSize);
+ return put(in, ugi, delegation, namenodeRpcAddress, ROOT, op, permission,
+ overwrite, bufferSize, replication, blockSize);
}
/** Handle HTTP PUT request. */
@@ -149,6 +153,9 @@ public class DatanodeWebHdfsMethods {
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
+ @QueryParam(NamenodeRpcAddressParam.NAME)
+ @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
+ final NamenodeRpcAddressParam namenodeRpcAddress,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
final PutOpParam op,
@@ -164,8 +171,9 @@ public class DatanodeWebHdfsMethods {
final BlockSizeParam blockSize
) throws IOException, InterruptedException {
- init(ugi, delegation, path, op, permission, overwrite, bufferSize,
- replication, blockSize);
+ final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue();
+ init(ugi, delegation, nnRpcAddr, path, op, permission,
+ overwrite, bufferSize, replication, blockSize);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
@@ -178,7 +186,6 @@ public class DatanodeWebHdfsMethods {
case CREATE:
{
final Configuration conf = new Configuration(datanode.getConf());
- final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
conf.set(FsPermission.UMASK_LABEL, "000");
final int b = bufferSize.getValue(conf);
@@ -221,12 +228,15 @@ public class DatanodeWebHdfsMethods {
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
+ @QueryParam(NamenodeRpcAddressParam.NAME)
+ @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
+ final NamenodeRpcAddressParam namenodeRpcAddress,
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
final PostOpParam op,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize
) throws IOException, InterruptedException {
- return post(in, ugi, delegation, ROOT, op, bufferSize);
+ return post(in, ugi, delegation, namenodeRpcAddress, ROOT, op, bufferSize);
}
/** Handle HTTP POST request. */
@@ -239,6 +249,9 @@ public class DatanodeWebHdfsMethods {
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
+ @QueryParam(NamenodeRpcAddressParam.NAME)
+ @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
+ final NamenodeRpcAddressParam namenodeRpcAddress,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
final PostOpParam op,
@@ -246,7 +259,8 @@ public class DatanodeWebHdfsMethods {
final BufferSizeParam bufferSize
) throws IOException, InterruptedException {
- init(ugi, delegation, path, op, bufferSize);
+ final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue();
+ init(ugi, delegation, nnRpcAddr, path, op, bufferSize);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
@@ -259,7 +273,6 @@ public class DatanodeWebHdfsMethods {
case APPEND:
{
final Configuration conf = new Configuration(datanode.getConf());
- final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
final int b = bufferSize.getValue(conf);
DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
FSDataOutputStream out = null;
@@ -291,6 +304,9 @@ public class DatanodeWebHdfsMethods {
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
+ @QueryParam(NamenodeRpcAddressParam.NAME)
+ @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
+ final NamenodeRpcAddressParam namenodeRpcAddress,
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
final GetOpParam op,
@QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
@@ -300,7 +316,8 @@ public class DatanodeWebHdfsMethods {
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize
) throws IOException, InterruptedException {
- return get(ugi, delegation, ROOT, op, offset, length, bufferSize);
+ return get(ugi, delegation, namenodeRpcAddress, ROOT, op, offset, length,
+ bufferSize);
}
/** Handle HTTP GET request. */
@@ -311,6 +328,9 @@ public class DatanodeWebHdfsMethods {
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
+ @QueryParam(NamenodeRpcAddressParam.NAME)
+ @DefaultValue(NamenodeRpcAddressParam.DEFAULT)
+ final NamenodeRpcAddressParam namenodeRpcAddress,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
final GetOpParam op,
@@ -322,7 +342,8 @@ public class DatanodeWebHdfsMethods {
final BufferSizeParam bufferSize
) throws IOException, InterruptedException {
- init(ugi, delegation, path, op, offset, length, bufferSize);
+ final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue();
+ init(ugi, delegation, nnRpcAddr, path, op, offset, length, bufferSize);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
@@ -331,7 +352,6 @@ public class DatanodeWebHdfsMethods {
final String fullpath = path.getAbsolutePath();
final DataNode datanode = (DataNode)context.getAttribute("datanode");
final Configuration conf = new Configuration(datanode.getConf());
- final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
switch(op.getValue()) {
case OPEN:
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java?rev=1214027&r1=1214026&r2=1214027&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
Wed Dec 14 01:47:57 2011
@@ -76,6 +76,7 @@ import org.apache.hadoop.hdfs.web.resour
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
+import org.apache.hadoop.hdfs.web.resources.NamenodeRpcAddressParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
import org.apache.hadoop.hdfs.web.resources.OwnerParam;
@@ -198,6 +199,7 @@ public class NamenodeWebHdfsMethods {
delegationQuery = "&" + new DelegationParam(t.encodeToUrlString());
}
final String query = op.toQueryString() + delegationQuery
+ + "&" + new NamenodeRpcAddressParam(namenode)
+ Param.toSortedString("&", parameters);
final String uripath = WebHdfsFileSystem.PATH_PREFIX + path;
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/InetSocketAddressParam.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/InetSocketAddressParam.java?rev=1214027&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/InetSocketAddressParam.java
(added)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/InetSocketAddressParam.java
Wed Dec 14 01:47:57 2011
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.resources;
+
+import java.net.InetSocketAddress;
+
+/** InetSocketAddressParam parameter. */
+abstract class InetSocketAddressParam
+ extends Param<InetSocketAddress, InetSocketAddressParam.Domain> {
+ InetSocketAddressParam(final Domain domain, final InetSocketAddress value) {
+ super(domain, value);
+ }
+
+ @Override
+ public String toString() {
+ return getName() + "=" + Domain.toString(getValue());
+ }
+
+ /** The domain of the parameter. */
+ static final class Domain extends Param.Domain<InetSocketAddress> {
+ Domain(final String paramName) {
+ super(paramName);
+ }
+
+ @Override
+ public String getDomain() {
+ return "<HOST:PORT>";
+ }
+
+ @Override
+ InetSocketAddress parse(final String str) {
+ final int i = str.indexOf(':');
+ if (i < 0) {
+ throw new IllegalArgumentException("Failed to parse \"" + str
+ + "\" as " + getDomain() + ": the ':' character not found.");
+ } else if (i == 0) {
+ throw new IllegalArgumentException("Failed to parse \"" + str
+ + "\" as " + getDomain() + ": HOST is empty.");
+ } else if (i == str.length() - 1) {
+ throw new IllegalArgumentException("Failed to parse \"" + str
+ + "\" as " + getDomain() + ": PORT is empty.");
+ }
+
+ final String host = str.substring(0, i);
+ final int port;
+ try {
+ port = Integer.parseInt(str.substring(i + 1));
+ } catch(NumberFormatException e) {
+ throw new IllegalArgumentException("Failed to parse \"" + str
+ + "\" as " + getDomain() + ": the ':' position is " + i
+ + " but failed to parse PORT.", e);
+ }
+
+ try {
+ return new InetSocketAddress(host, port);
+ } catch(Exception e) {
+ throw new IllegalArgumentException("Failed to parse \"" + str
+ + "\": cannot create InetSocketAddress(host=" + host
+ + ", port=" + port + ")", e);
+ }
+ }
+
+ /** Convert an InetSocketAddress to a HOST:PORT String. */
+ static String toString(final InetSocketAddress addr) {
+ return addr.getHostName() + ":" + addr.getPort();
+ }
+ }
+}
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeRpcAddressParam.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeRpcAddressParam.java?rev=1214027&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeRpcAddressParam.java
(added)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeRpcAddressParam.java
Wed Dec 14 01:47:57 2011
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.resources;
+
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+/** Namenode RPC address parameter. */
+public class NamenodeRpcAddressParam extends InetSocketAddressParam {
+ /** Parameter name. */
+ public static final String NAME = "namenoderpcaddress";
+ /** Default parameter value. */
+ public static final String DEFAULT = "";
+
+ private static final Domain DOMAIN = new Domain(NAME);
+
+ /**
+ * Constructor.
+ * @param str a string representation of the parameter value.
+ */
+ public NamenodeRpcAddressParam(final String str) {
+ super(DOMAIN, str == null || str.equals(DEFAULT)? null: DOMAIN.parse(str));
+ }
+
+ /**
+ * Construct an object using the RPC address of the given namenode.
+ */
+ public NamenodeRpcAddressParam(final NameNode namenode) {
+ super(DOMAIN, namenode.getNameNodeAddress());
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+}
\ No newline at end of file
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsWithMultipleNameNodes.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsWithMultipleNameNodes.java?rev=1214027&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsWithMultipleNameNodes.java
(added)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsWithMultipleNameNodes.java
Wed Dec 14 01:47:57 2011
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import
org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
+import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test WebHDFS with multiple NameNodes
+ */
+public class TestWebHdfsWithMultipleNameNodes {
+ static final Log LOG = WebHdfsTestUtil.LOG;
+
+ static private void setLogLevel() {
+ ((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)NamenodeWebHdfsMethods.LOG).getLogger().setLevel(Level.ALL);
+
+ ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.OFF);
+ ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.OFF);
+
((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.OFF);
+ ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.OFF);
+ }
+
+ private static final Configuration conf = new HdfsConfiguration();
+ private static MiniDFSCluster cluster;
+ private static WebHdfsFileSystem[] webhdfs;
+
+ @BeforeClass
+ public static void setupTest() {
+ setLogLevel();
+ try {
+ setupCluster(4, 3);
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static void setupCluster(final int nNameNodes, final int nDataNodes)
+ throws Exception {
+ LOG.info("nNameNodes=" + nNameNodes + ", nDataNodes=" + nDataNodes);
+
+ conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
+
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numNameNodes(nNameNodes)
+ .numDataNodes(nDataNodes)
+ .build();
+ cluster.waitActive();
+
+ webhdfs = new WebHdfsFileSystem[nNameNodes];
+ for(int i = 0; i < webhdfs.length; i++) {
+ final InetSocketAddress addr = cluster.getNameNode(i).getHttpAddress();
+ final String uri = WebHdfsFileSystem.SCHEME + "://"
+ + addr.getHostName() + ":" + addr.getPort() + "/";
+ webhdfs[i] = (WebHdfsFileSystem)FileSystem.get(new URI(uri), conf);
+ }
+ }
+
+ @AfterClass
+ public static void shutdownCluster() {
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ private static String createString(String prefix, int i) {
+ //The suffix is to make sure the strings have different lengths.
+ final String suffix = "*********************".substring(0, i+1);
+ return prefix + i + suffix + "\n";
+ }
+
+ private static String[] createStrings(String prefix, String name) {
+ final String[] strings = new String[webhdfs.length];
+ for(int i = 0; i < webhdfs.length; i++) {
+ strings[i] = createString(prefix, i);
+ LOG.info(name + "[" + i + "] = " + strings[i]);
+ }
+ return strings;
+ }
+
+ @Test
+ public void testRedirect() throws Exception {
+ final String dir = "/testRedirect/";
+ final String filename = "file";
+ final Path p = new Path(dir, filename);
+
+ final String[] writeStrings = createStrings("write to webhdfs ", "write");
+ final String[] appendStrings = createStrings("append to webhdfs ",
"append");
+
+ //test create: create a file for each namenode
+ for(int i = 0; i < webhdfs.length; i++) {
+ final FSDataOutputStream out = webhdfs[i].create(p);
+ out.write(writeStrings[i].getBytes());
+ out.close();
+ }
+
+ for(int i = 0; i < webhdfs.length; i++) {
+ //check file length
+ final long expected = writeStrings[i].length();
+ Assert.assertEquals(expected, webhdfs[i].getFileStatus(p).getLen());
+ }
+
+ //test read: check file content for each namenode
+ for(int i = 0; i < webhdfs.length; i++) {
+ final FSDataInputStream in = webhdfs[i].open(p);
+ for(int c, j = 0; (c = in.read()) != -1; j++) {
+ Assert.assertEquals(writeStrings[i].charAt(j), c);
+ }
+ in.close();
+ }
+
+ //test append: append to the file for each namenode
+ for(int i = 0; i < webhdfs.length; i++) {
+ final FSDataOutputStream out = webhdfs[i].append(p);
+ out.write(appendStrings[i].getBytes());
+ out.close();
+ }
+
+ for(int i = 0; i < webhdfs.length; i++) {
+ //check file length
+ final long expected = writeStrings[i].length() +
appendStrings[i].length();
+ Assert.assertEquals(expected, webhdfs[i].getFileStatus(p).getLen());
+ }
+
+ //test read: check file content for each namenode
+ for(int i = 0; i < webhdfs.length; i++) {
+ final StringBuilder b = new StringBuilder();
+ final FSDataInputStream in = webhdfs[i].open(p);
+ for(int c; (c = in.read()) != -1; ) {
+ b.append((char)c);
+ }
+ final int wlen = writeStrings[i].length();
+ Assert.assertEquals(writeStrings[i], b.substring(0, wlen));
+ Assert.assertEquals(appendStrings[i], b.substring(wlen));
+ in.close();
+ }
+ }
+}