Author: jing9 Date: Wed Mar 19 17:33:51 2014 New Revision: 1579303 URL: http://svn.apache.org/r1579303 Log: HDFS-6100. Merge r1579301 from trunk.
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeAddressParam.java - copied unchanged from r1579301, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeAddressParam.java Removed: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/InetSocketAddressParam.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeRpcAddressParam.java Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSForHA.java hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1579303&r1=1579302&r2=1579303&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Mar 19 17:33:51 2014 @@ -405,6 +405,9 @@ Release 2.4.0 - UNRELEASED HDFS-6099. HDFS file system limits not enforced on renames. (cnauroth) + HDFS-6100. DataNodeWebHdfsMethods does not failover in HA mode. (Haohui Mai + via jing9) + BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9) Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1579303&r1=1579302&r2=1579303&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Wed Mar 19 17:33:51 2014 @@ -19,13 +19,13 @@ package org.apache.hadoop.hdfs.server.da import java.io.IOException; import java.io.InputStream; -import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; import java.util.EnumSet; import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.Consumes; import javax.ws.rs.DefaultValue; @@ -40,6 +40,7 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -48,12 +49,14 @@ import org.apache.hadoop.fs.FSDataOutput import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.web.JsonUtil; import org.apache.hadoop.hdfs.web.ParamFilter; +import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.resources.BlockSizeParam; import org.apache.hadoop.hdfs.web.resources.BufferSizeParam; @@ -61,7 +64,7 @@ import org.apache.hadoop.hdfs.web.resour import org.apache.hadoop.hdfs.web.resources.GetOpParam; import org.apache.hadoop.hdfs.web.resources.HttpOpParam; import org.apache.hadoop.hdfs.web.resources.LengthParam; -import org.apache.hadoop.hdfs.web.resources.NamenodeRpcAddressParam; +import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam; import org.apache.hadoop.hdfs.web.resources.OffsetParam; import org.apache.hadoop.hdfs.web.resources.OverwriteParam; import org.apache.hadoop.hdfs.web.resources.Param; @@ -71,6 +74,7 @@ import org.apache.hadoop.hdfs.web.resour import org.apache.hadoop.hdfs.web.resources.ReplicationParam; import org.apache.hadoop.hdfs.web.resources.UriFsPathParam; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -86,18 +90,19 @@ public class DatanodeWebHdfsMethods { private static final UriFsPathParam ROOT = new UriFsPathParam(""); private @Context ServletContext context; + private @Context HttpServletRequest request; private @Context HttpServletResponse response; private void init(final UserGroupInformation ugi, - final DelegationParam delegation, final InetSocketAddress nnRpcAddr, + final DelegationParam delegation, final String nnId, final UriFsPathParam path, final HttpOpParam<?> op, final Param<?, ?>... parameters) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path + ", ugi=" + ugi + Param.toSortedString(", ", parameters)); } - if (nnRpcAddr == null) { - throw new IllegalArgumentException(NamenodeRpcAddressParam.NAME + if (nnId == null) { + throw new IllegalArgumentException(NamenodeAddressParam.NAME + " is not specified."); } @@ -106,15 +111,32 @@ public class DatanodeWebHdfsMethods { if (UserGroupInformation.isSecurityEnabled()) { //add a token for RPC. - final Token<DelegationTokenIdentifier> token = - new Token<DelegationTokenIdentifier>(); - token.decodeFromUrlString(delegation.getValue()); - SecurityUtil.setTokenService(token, nnRpcAddr); - token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND); + final Token<DelegationTokenIdentifier> token = deserializeToken + (delegation.getValue(), nnId); ugi.addToken(token); } } + @VisibleForTesting + Token<DelegationTokenIdentifier> deserializeToken + (String delegation,String nnId) throws IOException { + final DataNode datanode = (DataNode) context.getAttribute("datanode"); + final Configuration conf = datanode.getConf(); + final Token<DelegationTokenIdentifier> token = new + Token<DelegationTokenIdentifier>(); + token.decodeFromUrlString(delegation); + URI nnUri = URI.create(HdfsConstants.HDFS_URI_SCHEME + + "://" + nnId); + boolean isHA = HAUtil.isLogicalUri(conf, nnUri); + if (isHA) { + token.setService(HAUtil.buildTokenServiceForLogicalUri(nnUri)); + } else { + token.setService(SecurityUtil.buildTokenService(nnUri)); + } + token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND); + return token; + } + /** Handle HTTP PUT request for the root. */ @PUT @Path("/") @@ -125,9 +147,9 @@ public class DatanodeWebHdfsMethods { @Context final UserGroupInformation ugi, @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) final DelegationParam delegation, - @QueryParam(NamenodeRpcAddressParam.NAME) - @DefaultValue(NamenodeRpcAddressParam.DEFAULT) - final NamenodeRpcAddressParam namenodeRpcAddress, + @QueryParam(NamenodeAddressParam.NAME) + @DefaultValue(NamenodeAddressParam.DEFAULT) + final NamenodeAddressParam namenode, @QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT) final PutOpParam op, @QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT) @@ -141,8 +163,8 @@ public class DatanodeWebHdfsMethods { @QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT) final BlockSizeParam blockSize ) throws IOException, InterruptedException { - return put(in, ugi, delegation, namenodeRpcAddress, ROOT, op, permission, - overwrite, bufferSize, replication, blockSize); + return put(in, ugi, delegation, namenode, ROOT, op, permission, + overwrite, bufferSize, replication, blockSize); } /** Handle HTTP PUT request. */ @@ -155,9 +177,9 @@ public class DatanodeWebHdfsMethods { @Context final UserGroupInformation ugi, @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) final DelegationParam delegation, - @QueryParam(NamenodeRpcAddressParam.NAME) - @DefaultValue(NamenodeRpcAddressParam.DEFAULT) - final NamenodeRpcAddressParam namenodeRpcAddress, + @QueryParam(NamenodeAddressParam.NAME) + @DefaultValue(NamenodeAddressParam.DEFAULT) + final NamenodeAddressParam namenode, @PathParam(UriFsPathParam.NAME) final UriFsPathParam path, @QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT) final PutOpParam op, @@ -173,24 +195,22 @@ public class DatanodeWebHdfsMethods { final BlockSizeParam blockSize ) throws IOException, InterruptedException { - final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue(); - init(ugi, delegation, nnRpcAddr, path, op, permission, + final String nnId = namenode.getValue(); + init(ugi, delegation, nnId, path, op, permission, overwrite, bufferSize, replication, blockSize); return ugi.doAs(new PrivilegedExceptionAction<Response>() { @Override public Response run() throws IOException, URISyntaxException { - return put(in, ugi, delegation, nnRpcAddr, path.getAbsolutePath(), op, - permission, overwrite, bufferSize, replication, blockSize); + return put(in, nnId, path.getAbsolutePath(), op, + permission, overwrite, bufferSize, replication, blockSize); } }); } private Response put( final InputStream in, - final UserGroupInformation ugi, - final DelegationParam delegation, - final InetSocketAddress nnRpcAddr, + final String nnId, final String fullpath, final PutOpParam op, final PermissionParam permission, @@ -208,7 +228,7 @@ public class DatanodeWebHdfsMethods { conf.set(FsPermission.UMASK_LABEL, "000"); final int b = bufferSize.getValue(conf); - DFSClient dfsclient = new DFSClient(nnRpcAddr, conf); + DFSClient dfsclient = newDfsClient(nnId, conf); FSDataOutputStream out = null; try { out = new FSDataOutputStream(dfsclient.create( @@ -225,9 +245,10 @@ public class DatanodeWebHdfsMethods { IOUtils.cleanup(LOG, out); IOUtils.cleanup(LOG, dfsclient); } - final InetSocketAddress nnHttpAddr = NameNode.getHttpAddress(conf); - final URI uri = new URI(WebHdfsFileSystem.SCHEME, null, - nnHttpAddr.getHostName(), nnHttpAddr.getPort(), fullpath, null, null); + final String scheme = "http".equals(request.getScheme()) ? + WebHdfsFileSystem.SCHEME : SWebHdfsFileSystem.SCHEME; + final URI uri = URI.create(String.format("%s://%s/%s", scheme, + nnId, fullpath)); return Response.created(uri).type(MediaType.APPLICATION_OCTET_STREAM).build(); } default: @@ -245,15 +266,15 @@ public class DatanodeWebHdfsMethods { @Context final UserGroupInformation ugi, @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) final DelegationParam delegation, - @QueryParam(NamenodeRpcAddressParam.NAME) - @DefaultValue(NamenodeRpcAddressParam.DEFAULT) - final NamenodeRpcAddressParam namenodeRpcAddress, + @QueryParam(NamenodeAddressParam.NAME) + @DefaultValue(NamenodeAddressParam.DEFAULT) + final NamenodeAddressParam namenode, @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT) final PostOpParam op, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) final BufferSizeParam bufferSize ) throws IOException, InterruptedException { - return post(in, ugi, delegation, namenodeRpcAddress, ROOT, op, bufferSize); + return post(in, ugi, delegation, namenode, ROOT, op, bufferSize); } /** Handle HTTP POST request. */ @@ -266,9 +287,9 @@ public class DatanodeWebHdfsMethods { @Context final UserGroupInformation ugi, @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) final DelegationParam delegation, - @QueryParam(NamenodeRpcAddressParam.NAME) - @DefaultValue(NamenodeRpcAddressParam.DEFAULT) - final NamenodeRpcAddressParam namenodeRpcAddress, + @QueryParam(NamenodeAddressParam.NAME) + @DefaultValue(NamenodeAddressParam.DEFAULT) + final NamenodeAddressParam namenode, @PathParam(UriFsPathParam.NAME) final UriFsPathParam path, @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT) final PostOpParam op, @@ -276,23 +297,21 @@ public class DatanodeWebHdfsMethods { final BufferSizeParam bufferSize ) throws IOException, InterruptedException { - final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue(); - init(ugi, delegation, nnRpcAddr, path, op, bufferSize); + final String nnId = namenode.getValue(); + init(ugi, delegation, nnId, path, op, bufferSize); return ugi.doAs(new PrivilegedExceptionAction<Response>() { @Override public Response run() throws IOException { - return post(in, ugi, delegation, nnRpcAddr, path.getAbsolutePath(), op, - bufferSize); + return post(in, nnId, path.getAbsolutePath(), op, + bufferSize); } }); } private Response post( final InputStream in, - final UserGroupInformation ugi, - final DelegationParam delegation, - final InetSocketAddress nnRpcAddr, + final String nnId, final String fullpath, final PostOpParam op, final BufferSizeParam bufferSize @@ -304,7 +323,7 @@ public class DatanodeWebHdfsMethods { { final Configuration conf = new Configuration(datanode.getConf()); final int b = bufferSize.getValue(conf); - DFSClient dfsclient = new DFSClient(nnRpcAddr, conf); + DFSClient dfsclient = newDfsClient(nnId, conf); FSDataOutputStream out = null; try { out = dfsclient.append(fullpath, b, null, null); @@ -332,9 +351,9 @@ public class DatanodeWebHdfsMethods { @Context final UserGroupInformation ugi, @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) final DelegationParam delegation, - @QueryParam(NamenodeRpcAddressParam.NAME) - @DefaultValue(NamenodeRpcAddressParam.DEFAULT) - final NamenodeRpcAddressParam namenodeRpcAddress, + @QueryParam(NamenodeAddressParam.NAME) + @DefaultValue(NamenodeAddressParam.DEFAULT) + final NamenodeAddressParam namenode, @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT) final GetOpParam op, @QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT) @@ -344,7 +363,7 @@ public class DatanodeWebHdfsMethods { @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) final BufferSizeParam bufferSize ) throws IOException, InterruptedException { - return get(ugi, delegation, namenodeRpcAddress, ROOT, op, offset, length, + return get(ugi, delegation, namenode, ROOT, op, offset, length, bufferSize); } @@ -356,9 +375,9 @@ public class DatanodeWebHdfsMethods { @Context final UserGroupInformation ugi, @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) final DelegationParam delegation, - @QueryParam(NamenodeRpcAddressParam.NAME) - @DefaultValue(NamenodeRpcAddressParam.DEFAULT) - final NamenodeRpcAddressParam namenodeRpcAddress, + @QueryParam(NamenodeAddressParam.NAME) + @DefaultValue(NamenodeAddressParam.DEFAULT) + final NamenodeAddressParam namenode, @PathParam(UriFsPathParam.NAME) final UriFsPathParam path, @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT) final GetOpParam op, @@ -370,22 +389,20 @@ public class DatanodeWebHdfsMethods { final BufferSizeParam bufferSize ) throws IOException, InterruptedException { - final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue(); - init(ugi, delegation, nnRpcAddr, path, op, offset, length, bufferSize); + final String nnId = namenode.getValue(); + init(ugi, delegation, nnId, path, op, offset, length, bufferSize); return ugi.doAs(new PrivilegedExceptionAction<Response>() { @Override public Response run() throws IOException { - return get(ugi, delegation, nnRpcAddr, path.getAbsolutePath(), op, - offset, length, bufferSize); + return get(nnId, path.getAbsolutePath(), op, offset, + length, bufferSize); } }); } private Response get( - final UserGroupInformation ugi, - final DelegationParam delegation, - final InetSocketAddress nnRpcAddr, + final String nnId, final String fullpath, final GetOpParam op, final OffsetParam offset, @@ -399,7 +416,7 @@ public class DatanodeWebHdfsMethods { case OPEN: { final int b = bufferSize.getValue(conf); - final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf); + final DFSClient dfsclient = newDfsClient(nnId, conf); HdfsDataInputStream in = null; try { in = new HdfsDataInputStream(dfsclient.open(fullpath, b, true)); @@ -426,7 +443,7 @@ public class DatanodeWebHdfsMethods { case GETFILECHECKSUM: { MD5MD5CRC32FileChecksum checksum = null; - DFSClient dfsclient = new DFSClient(nnRpcAddr, conf); + DFSClient dfsclient = newDfsClient(nnId, conf); try { checksum = dfsclient.getFileChecksum(fullpath); dfsclient.close(); @@ -441,4 +458,10 @@ public class DatanodeWebHdfsMethods { throw new UnsupportedOperationException(op + " is not supported"); } } + + private static DFSClient newDfsClient(String nnId, + Configuration conf) throws IOException { + URI uri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://" + nnId); + return new DFSClient(uri, conf); + } } Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1579303&r1=1579302&r2=1579303&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Mar 19 17:33:51 2014 @@ -126,7 +126,7 @@ public class NameNode implements NameNod static{ HdfsConfiguration.init(); } - + /** * Categories of operations supported by the namenode. */ @@ -269,6 +269,11 @@ public class NameNode implements NameNod private JvmPauseMonitor pauseMonitor; private ObjectName nameNodeStatusBeanName; + /** + * The service name of the delegation token issued by the namenode. It is + * the name service id in HA mode, or the rpc address in non-HA mode. + */ + private String tokenServiceName; /** Format a new filesystem. Destroys any filesystem that may already * exist at this location. **/ @@ -306,6 +311,13 @@ public class NameNode implements NameNod return startupProgress; } + /** + * Return the service name of the issued delegation token. + * + * @return The name service id in HA-mode, or the rpc address in non-HA mode + */ + public String getTokenServiceName() { return tokenServiceName; } + public static InetSocketAddress getAddress(String address) { return NetUtils.createSocketAddr(address, DEFAULT_PORT); } @@ -499,6 +511,9 @@ public class NameNode implements NameNod loadNamesystem(conf); rpcServer = createRpcServer(conf); + final String nsId = getNameServiceId(conf); + tokenServiceName = HAUtil.isHAEnabled(conf, nsId) ? nsId : NetUtils + .getHostPortString(rpcServer.getRpcAddress()); if (NamenodeRole.NAMENODE == role) { httpServer.setNameNodeAddress(getNameNodeAddress()); httpServer.setFSImage(getFSImage()); Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java?rev=1579303&r1=1579302&r2=1579303&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java Wed Mar 19 17:33:51 2014 @@ -86,7 +86,7 @@ import org.apache.hadoop.hdfs.web.resour import org.apache.hadoop.hdfs.web.resources.HttpOpParam; import org.apache.hadoop.hdfs.web.resources.LengthParam; import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam; -import org.apache.hadoop.hdfs.web.resources.NamenodeRpcAddressParam; +import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam; import org.apache.hadoop.hdfs.web.resources.OffsetParam; import org.apache.hadoop.hdfs.web.resources.OverwriteParam; import org.apache.hadoop.hdfs.web.resources.OwnerParam; @@ -275,7 +275,7 @@ public class NamenodeWebHdfsMethods { delegationQuery = "&" + new DelegationParam(t.encodeToUrlString()); } final String query = op.toQueryString() + delegationQuery - + "&" + new NamenodeRpcAddressParam(namenode) + + "&" + new NamenodeAddressParam(namenode) + Param.toSortedString("&", parameters); final String uripath = WebHdfsFileSystem.PATH_PREFIX + path; Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1579303&r1=1579302&r2=1579303&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Wed Mar 19 17:33:51 2014 @@ -53,8 +53,11 @@ import org.apache.hadoop.hdfs.server.dat import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.ha + .ConfiguredFailoverProxyProvider; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.web.TestWebHDFSForHA; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.net.NetUtils; @@ -136,7 +139,22 @@ public class DFSTestUtil { NameNode.format(conf); } - + + /** + * Create a new HA-enabled configuration. + */ + public static Configuration newHAConfiguration(final String logicalName) { + Configuration conf = new Configuration(); + conf.set(DFSConfigKeys.DFS_NAMESERVICES, logicalName); + conf.set(DFSUtil.addKeySuffixes(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX, + logicalName), "nn1,nn2"); + conf.set(DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "" + + "." + logicalName, + ConfiguredFailoverProxyProvider.class.getName()); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + return conf; + } + /** class MyFile contains enough information to recreate the contents of * a single file. */ Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSForHA.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSForHA.java?rev=1579303&r1=1579302&r2=1579303&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSForHA.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSForHA.java Wed Mar 19 17:33:51 2014 @@ -18,23 +18,25 @@ package org.apache.hadoop.hdfs.web; -import java.io.IOException; -import java.net.URI; - import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.token.Token; import org.junit.Assert; import org.junit.Test; -/** Test whether WebHDFS can connect to an HA cluster */ +import java.io.IOException; +import java.net.URI; + public class TestWebHDFSForHA { private static final String LOGICAL_NAME = "minidfs"; + private static final URI WEBHDFS_URI = URI.create(WebHdfsFileSystem.SCHEME + + "://" + LOGICAL_NAME); private static final MiniDFSNNTopology topo = new MiniDFSNNTopology() .addNameservice(new MiniDFSNNTopology.NSConf(LOGICAL_NAME).addNN( new MiniDFSNNTopology.NNConf("nn1")).addNN( @@ -42,8 +44,7 @@ public class TestWebHDFSForHA { @Test public void testHA() throws IOException { - Configuration conf = new Configuration(); - conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME); MiniDFSCluster cluster = null; FileSystem fs = null; try { @@ -54,8 +55,7 @@ public class TestWebHDFSForHA { cluster.waitActive(); - final String uri = WebHdfsFileSystem.SCHEME + "://" + LOGICAL_NAME; - fs = FileSystem.get(URI.create(uri), conf); + fs = FileSystem.get(WEBHDFS_URI, conf); cluster.transitionToActive(0); final Path dir = new Path("/test"); @@ -67,9 +67,7 @@ public class TestWebHDFSForHA { final Path dir2 = new Path("/test2"); Assert.assertTrue(fs.mkdirs(dir2)); } finally { - if (fs != null) { - fs.close(); - } + IOUtils.cleanup(null, fs); if (cluster != null) { cluster.shutdown(); } @@ -78,10 +76,9 @@ public class TestWebHDFSForHA { @Test public void testSecureHA() throws IOException { - Configuration conf = new Configuration(); + Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true); - conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); MiniDFSCluster cluster = null; WebHdfsFileSystem fs = null; @@ -92,8 +89,7 @@ public class TestWebHDFSForHA { HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME); cluster.waitActive(); - final String uri = WebHdfsFileSystem.SCHEME + "://" + LOGICAL_NAME; - fs = (WebHdfsFileSystem) FileSystem.get(URI.create(uri), conf); + fs = (WebHdfsFileSystem) FileSystem.get(WEBHDFS_URI, conf); cluster.transitionToActive(0); Token<?> token = fs.getDelegationToken(null); @@ -104,9 +100,44 @@ public class TestWebHDFSForHA { fs.renewDelegationToken(token); fs.cancelDelegationToken(token); } finally { - if (fs != null) { - fs.close(); + IOUtils.cleanup(null, fs); + if (cluster != null) { + cluster.shutdown(); } + } + } + + @Test + public void testFailoverAfterOpen() throws IOException { + Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME); + MiniDFSCluster cluster = null; + FileSystem fs = null; + final Path p = new Path("/test"); + final byte[] data = "Hello".getBytes(); + + try { + cluster = new MiniDFSCluster.Builder(conf).nnTopology(topo) + .numDataNodes(1).build(); + + HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME); + + cluster.waitActive(); + + fs = FileSystem.get(WEBHDFS_URI, conf); + cluster.transitionToActive(1); + + FSDataOutputStream out = fs.create(p); + cluster.shutdownNameNode(1); + cluster.transitionToActive(0); + + out.write(data); + out.close(); + FSDataInputStream in = fs.open(p); + byte[] buf = new byte[data.length]; + IOUtils.readFully(in, buf, 0, buf.length); + Assert.assertArrayEquals(data, buf); + } finally { + IOUtils.cleanup(null, fs); if (cluster != null) { cluster.shutdown(); } Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java?rev=1579303&r1=1579302&r2=1579303&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java Wed Mar 19 17:33:51 2014 @@ -43,13 +43,8 @@ import org.apache.hadoop.fs.permission.F import org.apache.hadoop.hdfs.AppendTestUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.web.resources.DoAsParam; -import org.apache.hadoop.hdfs.web.resources.GetOpParam; -import org.apache.hadoop.hdfs.web.resources.HttpOpParam; -import org.apache.hadoop.hdfs.web.resources.LengthParam; -import org.apache.hadoop.hdfs.web.resources.NamenodeRpcAddressParam; -import org.apache.hadoop.hdfs.web.resources.OffsetParam; -import org.apache.hadoop.hdfs.web.resources.PutOpParam; +import org.apache.hadoop.hdfs.web.resources.*; +import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; @@ -465,7 +460,7 @@ public class TestWebHdfsFileSystemContra AppendTestUtil.testAppend(fs, new Path(dir, "append")); } - {//test NamenodeRpcAddressParam not set. + {//test NamenodeAddressParam not set. final HttpOpParam.Op op = PutOpParam.Op.CREATE; final URL url = webhdfs.toUrl(op, dir); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); @@ -476,9 +471,9 @@ public class TestWebHdfsFileSystemContra final String redirect = conn.getHeaderField("Location"); conn.disconnect(); - //remove NamenodeRpcAddressParam + //remove NamenodeAddressParam WebHdfsFileSystem.LOG.info("redirect = " + redirect); - final int i = redirect.indexOf(NamenodeRpcAddressParam.NAME); + final int i = redirect.indexOf(NamenodeAddressParam.NAME); final int j = redirect.indexOf("&", i); String modified = redirect.substring(0, i - 1) + redirect.substring(j); WebHdfsFileSystem.LOG.info("modified = " + modified);