Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1196458&r1=1196457&r2=1196458&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Wed Nov 2 05:34:31 2011 @@ -388,6 +388,8 @@ public class DFSInputStream extends FSIn DatanodeInfo chosenNode = null; int refetchToken = 1; // only need to get a new access token once + boolean connectFailedOnce = false; + while (true) { // // Compute desired block @@ -409,6 +411,10 @@ public class DFSInputStream extends FSIn accessToken, offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock, buffersize, verifyChecksum, dfsClient.clientName); + if(connectFailedOnce) { + DFSClient.LOG.info("Successfully connected to " + targetAddr + + " for block " + blk.getBlockId()); + } return chosenNode; } catch (IOException ex) { if (ex instanceof InvalidBlockTokenException && refetchToken > 0) { @@ -428,11 +434,9 @@ public class DFSInputStream extends FSIn refetchToken--; fetchBlockAt(target); } else { - DFSClient.LOG.warn("Failed to connect to " + targetAddr - + ", add to deadNodes and continue " + ex); - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Connection failure ", ex); - } + connectFailedOnce = true; + DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block" + + ", add to deadNodes and continue. " + ex, ex); // Put chosen node into dead list, continue addToDeadNodes(chosenNode); }
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1196458&r1=1196457&r2=1196458&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Wed Nov 2 05:34:31 2011 @@ -1033,9 +1033,7 @@ class DFSOutputStream extends FSOutputSu // send the request new Sender(out).writeBlock(block, accessToken, dfsClient.clientName, nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, - nodes.length, block.getNumBytes(), bytesSent, newGS); - checksum.writeHeader(out); - out.flush(); + nodes.length, block.getNumBytes(), bytesSent, newGS, checksum); // receive ack for connect BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1196458&r1=1196457&r2=1196458&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Wed Nov 2 05:34:31 2011 @@ -22,36 +22,29 @@ import static org.apache.hadoop.hdfs.DFS import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Random; import java.util.StringTokenizer; -import java.util.concurrent.TimeUnit; + +import javax.net.SocketFactory; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.io.retry.RetryProxy; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.UserGroupInformation; @@ -688,84 +681,38 @@ public class DFSUtil { /** Create a {@link NameNode} proxy */ - public static ClientProtocol createNamenode(Configuration conf) throws IOException { + public static ClientProtocol createNamenode(Configuration conf) + throws IOException { return createNamenode(NameNode.getAddress(conf), conf); } /** Create a {@link NameNode} proxy */ public static ClientProtocol createNamenode(InetSocketAddress nameNodeAddr, - Configuration conf) throws IOException { - return createNamenode(nameNodeAddr, conf, UserGroupInformation.getCurrentUser()); - } - - /** Create a {@link NameNode} proxy */ - public static ClientProtocol createNamenode(InetSocketAddress nameNodeAddr, - Configuration conf, UserGroupInformation ugi) throws IOException { - return createNamenode(createRPCNamenode(nameNodeAddr, conf, ugi)); - } - - /** Create a {@link NameNode} proxy */ - public static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr, - Configuration conf, UserGroupInformation ugi) - throws IOException { - return (ClientProtocol)RPC.getProxy(ClientProtocol.class, - ClientProtocol.versionID, nameNodeAddr, ugi, conf, - NetUtils.getSocketFactory(conf, ClientProtocol.class)); + Configuration conf) throws IOException { + return createNamenode(nameNodeAddr, conf, + UserGroupInformation.getCurrentUser()); } /** Create a {@link NameNode} proxy */ - public static ClientProtocol createNamenode(ClientProtocol rpcNamenode) - throws IOException { - RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( - 5, HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS); - - Map<Class<? extends Exception>,RetryPolicy> remoteExceptionToPolicyMap = - new HashMap<Class<? extends Exception>, RetryPolicy>(); - remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy); - - Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap = - new HashMap<Class<? extends Exception>, RetryPolicy>(); - exceptionToPolicyMap.put(RemoteException.class, - RetryPolicies.retryByRemoteException( - RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap)); - RetryPolicy methodPolicy = RetryPolicies.retryByException( - RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); - Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>(); - - methodNameToPolicyMap.put("create", methodPolicy); - - return (ClientProtocol) RetryProxy.create(ClientProtocol.class, - rpcNamenode, methodNameToPolicyMap); + public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr, + Configuration conf, UserGroupInformation ugi) throws IOException { + /** + * Currently we have simply burnt-in support for a SINGLE + * protocol - protocolR23Compatible. This will be replaced + * by a way to pick the right protocol based on the + * version of the target server. + */ + return new org.apache.hadoop.hdfs.protocolR23Compatible. + ClientNamenodeProtocolTranslatorR23(nameNodeAddr, conf, ugi); } /** Create a {@link ClientDatanodeProtocol} proxy */ public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( DatanodeID datanodeid, Configuration conf, int socketTimeout, - LocatedBlock locatedBlock) - throws IOException { - InetSocketAddress addr = NetUtils.createSocketAddr( - datanodeid.getHost() + ":" + datanodeid.getIpcPort()); - if (ClientDatanodeProtocol.LOG.isDebugEnabled()) { - ClientDatanodeProtocol.LOG.debug("ClientDatanodeProtocol addr=" + addr); - } - - // Since we're creating a new UserGroupInformation here, we know that no - // future RPC proxies will be able to re-use the same connection. And - // usages of this proxy tend to be one-off calls. - // - // This is a temporary fix: callers should really achieve this by using - // RPC.stopProxy() on the resulting object, but this is currently not - // working in trunk. See the discussion on HDFS-1965. - Configuration confWithNoIpcIdle = new Configuration(conf); - confWithNoIpcIdle.setInt(CommonConfigurationKeysPublic - .IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0); - - UserGroupInformation ticket = UserGroupInformation - .createRemoteUser(locatedBlock.getBlock().getLocalBlock().toString()); - ticket.addToken(locatedBlock.getBlockToken()); - return RPC.getProxy(ClientDatanodeProtocol.class, - ClientDatanodeProtocol.versionID, addr, ticket, confWithNoIpcIdle, - NetUtils.getDefaultSocketFactory(conf), socketTimeout); + LocatedBlock locatedBlock) throws IOException { + return new org.apache.hadoop.hdfs.protocolR23Compatible. + ClientDatanodeProtocolTranslatorR23(datanodeid, conf, socketTimeout, + locatedBlock); } /** @@ -776,6 +723,14 @@ public class DFSUtil { return collection != null && collection.size() != 0; } + /** Create a {@link ClientDatanodeProtocol} proxy */ + public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( + InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, + SocketFactory factory) throws IOException { + return new org.apache.hadoop.hdfs.protocolR23Compatible. + ClientDatanodeProtocolTranslatorR23(addr, ticket, conf, factory); + } + /** * Get nameservice Id for the {@link NameNode} based on namenode RPC address * matching the local node address. @@ -919,4 +874,14 @@ public class DFSUtil { private interface AddressMatcher { public boolean match(InetSocketAddress s); } + + /** Create a URI from the scheme and address */ + public static URI createUri(String scheme, InetSocketAddress address) { + try { + return new URI(scheme, null, address.getHostName(), address.getPort(), + null, null, null); + } catch (URISyntaxException ue) { + throw new IllegalArgumentException(ue); + } + } } Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1196458&r1=1196457&r2=1196458&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Wed Nov 2 05:34:31 2011 @@ -810,7 +810,6 @@ public class DistributedFileSystem exten ) throws IOException { Token<DelegationTokenIdentifier> result = dfs.getDelegationToken(renewer == null ? null : new Text(renewer)); - result.setService(new Text(getCanonicalServiceName())); return result; } @@ -830,7 +829,7 @@ public class DistributedFileSystem exten @Deprecated public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException { - return dfs.getDelegationToken(renewer); + return getDelegationToken(renewer.toString()); } @Override // FileSystem @@ -847,10 +846,15 @@ public class DistributedFileSystem exten * @param token delegation token obtained earlier * @return the new expiration time * @throws IOException + * @deprecated Use Token.renew instead. */ public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws InvalidToken, IOException { - return dfs.renewDelegationToken(token); + try { + return token.renew(getConf()); + } catch (InterruptedException ie) { + throw new RuntimeException("Caught interrupted", ie); + } } /** @@ -858,10 +862,15 @@ public class DistributedFileSystem exten * * @param token delegation token * @throws IOException + * @deprecated Use Token.cancel instead. */ public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException { - dfs.cancelDelegationToken(token); + try { + token.cancel(getConf()); + } catch (InterruptedException ie) { + throw new RuntimeException("Caught interrupted", ie); + } } /** Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java?rev=1196458&r1=1196457&r2=1196458&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java Wed Nov 2 05:34:31 2011 @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -43,12 +44,15 @@ public class HDFSPolicyProvider extends new Service("security.inter.datanode.protocol.acl", InterDatanodeProtocol.class), new Service("security.namenode.protocol.acl", NamenodeProtocol.class), - new Service("security.refresh.policy.protocol.acl", - RefreshAuthorizationPolicyProtocol.class), - new Service("security.refresh.user.mappings.protocol.acl", - RefreshUserMappingsProtocol.class), - new Service("security.get.user.mappings.protocol.acl", - GetUserMappingsProtocol.class) + new Service( + CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_POLICY, + RefreshAuthorizationPolicyProtocol.class), + new Service( + CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_USER_MAPPINGS, + RefreshUserMappingsProtocol.class), + new Service( + CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_GET_USER_MAPPINGS, + GetUserMappingsProtocol.class) }; @Override Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1196458&r1=1196457&r2=1196458&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java Wed Nov 2 05:34:31 2011 @@ -21,7 +21,6 @@ package org.apache.hadoop.hdfs; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import java.lang.ref.WeakReference; import java.net.HttpURLConnection; import java.net.InetSocketAddress; import java.net.URI; @@ -32,9 +31,6 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.TimeZone; -import java.util.concurrent.DelayQueue; -import java.util.concurrent.Delayed; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -49,6 +45,8 @@ import org.apache.hadoop.fs.MD5MD5CRC32F import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenRenewer; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher; @@ -60,6 +58,7 @@ import org.apache.hadoop.security.Securi import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ServletUtil; import org.xml.sax.Attributes; @@ -78,20 +77,28 @@ import org.xml.sax.helpers.XMLReaderFact */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class HftpFileSystem extends FileSystem { +public class HftpFileSystem extends FileSystem + implements DelegationTokenRenewer.Renewable { + private static final DelegationTokenRenewer<HftpFileSystem> dtRenewer + = new DelegationTokenRenewer<HftpFileSystem>(HftpFileSystem.class); + static { HttpURLConnection.setFollowRedirects(true); + dtRenewer.start(); } + public static final Text TOKEN_KIND = new Text("HFTP delegation"); + private String nnHttpUrl; - private URI hdfsURI; + private Text hdfsServiceName; + private URI hftpURI; protected InetSocketAddress nnAddr; protected UserGroupInformation ugi; public static final String HFTP_TIMEZONE = "UTC"; public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ"; - private Token<DelegationTokenIdentifier> delegationToken; - public static final String HFTP_SERVICE_NAME_KEY = "hdfs.service.host_"; + private Token<?> delegationToken; + private Token<?> renewToken; public static final SimpleDateFormat getDateFormat() { final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT); @@ -106,19 +113,23 @@ public class HftpFileSystem extends File } }; - private static RenewerThread renewer = new RenewerThread(); - static { - renewer.start(); - } - @Override protected int getDefaultPort() { - return DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT; + return getDefaultSecurePort(); + + //TODO: un-comment the following once HDFS-7510 is committed. +// return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY, +// DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT); + } + + protected int getDefaultSecurePort() { + return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, + DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT); } @Override public String getCanonicalServiceName() { - return SecurityUtil.buildDTServiceName(hdfsURI, getDefaultPort()); + return SecurityUtil.buildDTServiceName(hftpURI, getDefaultPort()); } private String buildUri(String schema, String host, int port) { @@ -127,7 +138,6 @@ public class HftpFileSystem extends File } - @SuppressWarnings("unchecked") @Override public void initialize(final URI name, final Configuration conf) throws IOException { @@ -144,17 +154,21 @@ public class HftpFileSystem extends File urlPort = conf.getInt(DFSConfigKeys.DFS_HTTPS_PORT_KEY, DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT); - nnHttpUrl = - buildUri("https://", NetUtils.normalizeHostName(name.getHost()), urlPort); + String normalizedNN = NetUtils.normalizeHostName(name.getHost()); + nnHttpUrl = buildUri("https://", normalizedNN ,urlPort); LOG.debug("using url to get DT:" + nnHttpUrl); + try { + hftpURI = new URI(buildUri("hftp://", normalizedNN, urlPort)); + } catch (URISyntaxException ue) { + throw new IOException("bad uri for hdfs", ue); + } - - // if one uses RPC port different from the Default one, // one should specify what is the setvice name for this delegation token // otherwise it is hostname:RPC_PORT - String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+ - SecurityUtil.buildDTServiceName(name, DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT); + String key = DelegationTokenSelector.SERVICE_NAME_KEY + + SecurityUtil.buildDTServiceName(name, + DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT); if(LOG.isDebugEnabled()) { LOG.debug("Trying to find DT for " + name + " using key=" + key + "; conf=" + conf.get(key, "")); @@ -165,9 +179,10 @@ public class HftpFileSystem extends File nnPort = NetUtils.createSocketAddr(nnServiceName, NameNode.DEFAULT_PORT).getPort(); } - try { - hdfsURI = new URI(buildUri("hdfs://", nnAddr.getHostName(), nnPort)); + URI hdfsURI = new URI("hdfs://" + normalizedNN + ":" + nnPort); + hdfsServiceName = new Text(SecurityUtil.buildDTServiceName(hdfsURI, + nnPort)); } catch (URISyntaxException ue) { throw new IOException("bad uri for hdfs", ue); } @@ -175,30 +190,73 @@ public class HftpFileSystem extends File if (UserGroupInformation.isSecurityEnabled()) { //try finding a token for this namenode (esp applicable for tasks //using hftp). If there exists one, just set the delegationField - String canonicalName = getCanonicalServiceName(); + String hftpServiceName = getCanonicalServiceName(); for (Token<? extends TokenIdentifier> t : ugi.getTokens()) { - if (DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(t.getKind()) && - t.getService().toString().equals(canonicalName)) { - if(LOG.isDebugEnabled()) { - LOG.debug("Found existing DT for " + name); + Text kind = t.getKind(); + if (DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(kind)) { + if (t.getService().equals(hdfsServiceName)) { + setDelegationToken(t); + break; + } + } else if (TOKEN_KIND.equals(kind)) { + if (hftpServiceName + .equals(normalizeService(t.getService().toString()))) { + setDelegationToken(t); + break; } - delegationToken = (Token<DelegationTokenIdentifier>) t; - break; } } //since we don't already have a token, go get one over https if (delegationToken == null) { - delegationToken = - (Token<DelegationTokenIdentifier>) getDelegationToken(null); - renewer.addTokenToRenew(this); + setDelegationToken(getDelegationToken(null)); + dtRenewer.addRenewAction(this); } } } + + private String normalizeService(String service) { + int colonIndex = service.indexOf(':'); + if (colonIndex == -1) { + throw new IllegalArgumentException("Invalid service for hftp token: " + + service); + } + String hostname = + NetUtils.normalizeHostName(service.substring(0, colonIndex)); + String port = service.substring(colonIndex + 1); + return hostname + ":" + port; + } + + //TODO: un-comment the following once HDFS-7510 is committed. +// protected Token<DelegationTokenIdentifier> selectHftpDelegationToken() { +// Text serviceName = SecurityUtil.buildTokenService(nnSecureAddr); +// return hftpTokenSelector.selectToken(serviceName, ugi.getTokens()); +// } + + protected Token<DelegationTokenIdentifier> selectHdfsDelegationToken() { + return DelegationTokenSelector.selectHdfsDelegationToken( + nnAddr, ugi, getConf()); + } @Override - public synchronized Token<?> getDelegationToken(final String renewer) throws IOException { + public Token<?> getRenewToken() { + return renewToken; + } + + @Override + public <T extends TokenIdentifier> void setDelegationToken(Token<T> token) { + renewToken = token; + // emulate the 203 usage of the tokens + // by setting the kind and service as if they were hdfs tokens + delegationToken = new Token<T>(token); + delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND); + delegationToken.setService(hdfsServiceName); + } + + @Override + public synchronized Token<?> getDelegationToken(final String renewer + ) throws IOException { try { //Renew TGT if needed ugi.reloginFromKeytab(); @@ -221,7 +279,6 @@ public class HftpFileSystem extends File LOG.debug("Got dt for " + getUri() + ";t.service=" +t.getService()); } - t.setService(new Text(getCanonicalServiceName())); return t; } return null; @@ -594,157 +651,43 @@ public class HftpFileSystem extends File return cs != null? cs: super.getContentSummary(f); } + @InterfaceAudience.Private + public static class TokenManager extends TokenRenewer { - /** - * An action that will renew and replace the hftp file system's delegation - * tokens automatically. - */ - private static class RenewAction implements Delayed { - // when should the renew happen - private long timestamp; - // a weak reference to the file system so that it can be garbage collected - private final WeakReference<HftpFileSystem> weakFs; - - RenewAction(long timestamp, HftpFileSystem fs) { - this.timestamp = timestamp; - this.weakFs = new WeakReference<HftpFileSystem>(fs); - } - - /** - * Get the delay until this event should happen. - */ @Override - public long getDelay(TimeUnit unit) { - long millisLeft = timestamp - System.currentTimeMillis(); - return unit.convert(millisLeft, TimeUnit.MILLISECONDS); + public boolean handleKind(Text kind) { + return kind.equals(TOKEN_KIND); } - /** - * Compare two events in the same queue. - */ - @Override - public int compareTo(Delayed o) { - if (o.getClass() != RenewAction.class) { - throw new IllegalArgumentException("Illegal comparision to non-RenewAction"); - } - RenewAction other = (RenewAction) o; - return timestamp < other.timestamp ? -1 : - (timestamp == other.timestamp ? 0 : 1); - } - @Override - public int hashCode() { - assert false : "hashCode not designed"; - return 33; - } - /** - * equals - */ - @Override - public boolean equals(Object o) { - if(!( o instanceof Delayed)) - return false; - - return compareTo((Delayed) o) == 0; + public boolean isManaged(Token<?> token) throws IOException { + return true; } - /** - * Set a new time for the renewal. Can only be called when the action - * is not in the queue. - * @param newTime the new time - */ - public void setNewTime(long newTime) { - timestamp = newTime; - } - - /** - * Renew or replace the delegation token for this file system. - * @return - * @throws IOException - */ @SuppressWarnings("unchecked") - public boolean renew() throws IOException, InterruptedException { - final HftpFileSystem fs = weakFs.get(); - if (fs != null) { - synchronized (fs) { - fs.ugi.reloginFromKeytab(); - fs.ugi.doAs(new PrivilegedExceptionAction<Void>() { - - @Override - public Void run() throws Exception { - try { - DelegationTokenFetcher.renewDelegationToken(fs.nnHttpUrl, - fs.delegationToken); - } catch (IOException ie) { - try { - fs.delegationToken = - (Token<DelegationTokenIdentifier>) fs.getDelegationToken(null); - } catch (IOException ie2) { - throw new IOException("Can't renew or get new delegation token ", - ie); - } - } - return null; - } - }); - } - } - return fs != null; - } - - public String toString() { - StringBuilder result = new StringBuilder(); - HftpFileSystem fs = weakFs.get(); - if (fs == null) { - return "evaporated token renew"; - } - synchronized (fs) { - result.append(fs.delegationToken); - } - result.append(" renew in "); - result.append(getDelay(TimeUnit.SECONDS)); - result.append(" secs"); - return result.toString(); + @Override + public long renew(Token<?> token, + Configuration conf) throws IOException { + // update the kerberos credentials, if they are coming from a keytab + UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); + // use https to renew the token + return + DelegationTokenFetcher.renewDelegationToken + ("https://" + token.getService().toString(), + (Token<DelegationTokenIdentifier>) token); } - } - /** - * A daemon thread that waits for the next file system to renew. - */ - private static class RenewerThread extends Thread { - private DelayQueue<RenewAction> queue = new DelayQueue<RenewAction>(); - // wait for 95% of a day between renewals - private static final int RENEW_CYCLE = (int) (0.95 * 24 * 60 * 60 * 1000); - - public RenewerThread() { - super("HFTP Delegation Token Renewer"); - setDaemon(true); - } - - public void addTokenToRenew(HftpFileSystem fs) { - queue.add(new RenewAction(RENEW_CYCLE + System.currentTimeMillis(),fs)); - } - - public void run() { - RenewAction action = null; - while (true) { - try { - action = queue.take(); - if (action.renew()) { - action.setNewTime(RENEW_CYCLE + System.currentTimeMillis()); - queue.add(action); - } - action = null; - } catch (InterruptedException ie) { - return; - } catch (Exception ie) { - if (action != null) { - LOG.warn("Failure to renew token " + action, ie); - } else { - LOG.warn("Failure in renew queue", ie); - } - } - } + @SuppressWarnings("unchecked") + @Override + public void cancel(Token<?> token, + Configuration conf) throws IOException { + // update the kerberos credentials, if they are coming from a keytab + UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); + // use https to cancel the token + DelegationTokenFetcher.cancelDelegationToken + ("https://" + token.getService().toString(), + (Token<DelegationTokenIdentifier>) token); } + } } Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1196458&r1=1196457&r2=1196458&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Wed Nov 2 05:34:31 2011 @@ -33,10 +33,13 @@ import org.apache.hadoop.classification. import org.apache.hadoop.fs.FSInputChecker; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; @@ -408,11 +411,14 @@ public class RemoteBlockReader extends F BlockOpResponseProto status = BlockOpResponseProto.parseFrom( vintPrefixed(in)); checkSuccess(status, sock, block, file); - DataChecksum checksum = DataChecksum.newDataChecksum( in ); + ReadOpChecksumInfoProto checksumInfo = + status.getReadOpChecksumInfo(); + DataChecksum checksum = DataTransferProtoUtil.fromProto( + checksumInfo.getChecksum()); //Warning when we get CHECKSUM_NULL? // Read the first chunk offset. - long firstChunkOffset = in.readLong(); + long firstChunkOffset = checksumInfo.getChunkOffset(); if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) { Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1196458&r1=1196457&r2=1196458&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Wed Nov 2 05:34:31 2011 @@ -37,10 +37,29 @@ import org.apache.hadoop.security.token. serverPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY) @TokenInfo(BlockTokenSelector.class) public interface ClientDatanodeProtocol extends VersionedProtocol { - public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class); - /** + * Until version 9, this class ClientDatanodeProtocol served as both + * the client interface to the DN AND the RPC protocol used to + * communicate with the NN. + * + * Post version 10 (release 23 of Hadoop), the protocol is implemented in + * {@literal ../protocolR23Compatible/ClientDatanodeWireProtocol} + * + * This class is used by both the DFSClient and the + * DN server side to insulate from the protocol serialization. + * + * If you are adding/changing DN's interface then you need to + * change both this class and ALSO + * {@link org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol}. + * These changes need to be done in a compatible fashion as described in + * {@link org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol} + * + * The log of historical changes can be retrieved from the svn). * 9: Added deleteBlockPool method + * + * 9 is the last version id when this class was used for protocols + * serialization. DO not update this version any further. + * Changes are recorded in R23 classes. */ public static final long versionID = 9L; Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1196458&r1=1196457&r2=1196458&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Wed Nov 2 05:34:31 2011 @@ -65,10 +65,28 @@ import org.apache.hadoop.hdfs.security.t public interface ClientProtocol extends VersionedProtocol { /** - * Compared to the previous version the following changes have been introduced: - * (Only the latest change is reflected. + * Until version 69, this class ClientProtocol served as both + * the client interface to the NN AND the RPC protocol used to + * communicate with the NN. + * + * Post version 70 (release 23 of Hadoop), the protocol is implemented in + * {@literal ../protocolR23Compatible/ClientNamenodeWireProtocol} + * + * This class is used by both the DFSClient and the + * NN server side to insulate from the protocol serialization. + * + * If you are adding/changing NN's interface then you need to + * change both this class and ALSO + * {@link org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol}. + * These changes need to be done in a compatible fashion as described in + * {@link org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol} + * * The log of historical changes can be retrieved from the svn). * 69: Eliminate overloaded method names. + * + * 69L is the last version id when this class was used for protocols + * serialization. DO not update this version any further. + * Changes are recorded in R23 classes. */ public static final long versionID = 69L; @@ -373,11 +391,8 @@ public interface ClientProtocol extends * @return true if successful, or false if the old name does not exist * or if the new name already belongs to the namespace. * - * @throws IOException an I/O error occurred - * - * @deprecated Use {@link #rename(String, String, Options.Rename...)} instead. + * @throws IOException an I/O error occurred */ - @Deprecated public boolean rename(String src, String dst) throws UnresolvedLinkException, IOException; Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java?rev=1196458&r1=1196457&r2=1196458&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java Wed Nov 2 05:34:31 2011 @@ -24,7 +24,6 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DeprecatedUTF8; import org.apache.hadoop.io.WritableComparable; Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1196458&r1=1196457&r2=1196458&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Wed Nov 2 05:34:31 2011 @@ -75,6 +75,13 @@ public class DatanodeInfo extends Datano public String toString() { return value; } + + public static AdminStates fromValue(final String value) { + for (AdminStates as : AdminStates.values()) { + if (as.value.equals(value)) return as; + } + return NORMAL; + } } @Nullable @@ -110,11 +117,20 @@ public class DatanodeInfo extends Datano this.adminState = null; } - protected DatanodeInfo(DatanodeID nodeID, String location, String hostName) { + public DatanodeInfo(DatanodeID nodeID, String location, String hostName) { this(nodeID); this.location = location; this.hostName = hostName; } + + public DatanodeInfo(DatanodeID nodeID, String location, String hostName, + final long capacity, final long dfsUsed, final long remaining, + final long blockPoolUsed, final long lastUpdate, final int xceiverCount, + final AdminStates adminState) { + this(nodeID.getName(), nodeID.getStorageID(), nodeID.getInfoPort(), nodeID + .getIpcPort(), capacity, dfsUsed, remaining, blockPoolUsed, lastUpdate, + xceiverCount, location, hostName, adminState); + } /** Constructor */ public DatanodeInfo(final String name, final String storageID, Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java?rev=1196458&r1=1196457&r2=1196458&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java Wed Nov 2 05:34:31 2011 @@ -26,11 +26,20 @@ import org.apache.hadoop.hdfs.HdfsConfig * ************************************/ @InterfaceAudience.Private -public final class HdfsConstants { +public class HdfsConstants { /* Hidden constructor */ - private HdfsConstants() { + protected HdfsConstants() { } - + + /** + * HDFS Protocol Names: + */ + public static final String CLIENT_NAMENODE_PROTOCOL_NAME = + "org.apache.hadoop.hdfs.protocol.ClientProtocol"; + public static final String CLIENT_DATANODE_PROTOCOL_NAME = + "org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol"; + + public static int MIN_BLOCKS_FOR_WRITE = 5; // Long that indicates "leave current quota unchanged" @@ -63,7 +72,7 @@ public final class HdfsConstants { public static final int BYTES_IN_INTEGER = Integer.SIZE / Byte.SIZE; // SafeMode actions - public enum SafeModeAction { + public static enum SafeModeAction { SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; } Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java?rev=1196458&r1=1196457&r2=1196458&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java Wed Nov 2 05:34:31 2011 @@ -241,6 +241,10 @@ public class HdfsFileStatus implements W final public String getSymlink() { return DFSUtil.bytes2String(symlink); } + + final public byte[] getSymlinkInBytes() { + return symlink; + } ////////////////////////////////////////////////// // Writable Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java?rev=1196458&r1=1196457&r2=1196458&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java Wed Nov 2 05:34:31 2011 @@ -87,6 +87,7 @@ public abstract class HdfsProtoUtil { .setName(dni.getName()) .setStorageID(dni.getStorageID()) .setInfoPort(dni.getInfoPort()) + .setIpcPort(dni.getIpcPort()) .build(); } @@ -95,7 +96,7 @@ public abstract class HdfsProtoUtil { idProto.getName(), idProto.getStorageID(), idProto.getInfoPort(), - -1); // ipc port not serialized in writables either + idProto.getIpcPort()); } //// DatanodeInfo //// Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=1196458&r1=1196457&r2=1196458&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Wed Nov 2 05:34:31 2011 @@ -54,6 +54,11 @@ public class LocatedBlock implements Wri public LocatedBlock() { this(new ExtendedBlock(), new DatanodeInfo[0], 0L, false); } + + + public LocatedBlock(ExtendedBlock eb) { + this(eb, new DatanodeInfo[0], 0L, false); + } public LocatedBlock(String bpid, Block b, DatanodeInfo[] locs) { this(new ExtendedBlock(bpid, b), locs, -1, false); // startOffset is unknown Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java?rev=1196458&r1=1196457&r2=1196458&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java Wed Nov 2 05:34:31 2011 @@ -23,10 +23,16 @@ import org.apache.hadoop.classification. import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto.ChecksumType; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; /** @@ -35,8 +41,20 @@ import org.apache.hadoop.security.token. */ @InterfaceAudience.Private @InterfaceStability.Evolving -abstract class DataTransferProtoUtil { +public abstract class DataTransferProtoUtil { + + /** + * Map between the internal DataChecksum identifiers and the protobuf- + * generated identifiers on the wire. + */ + static BiMap<Integer, ChecksumProto.ChecksumType> checksumTypeMap = + ImmutableBiMap.<Integer, ChecksumProto.ChecksumType>builder() + .put(DataChecksum.CHECKSUM_CRC32, ChecksumProto.ChecksumType.CRC32) + .put(DataChecksum.CHECKSUM_CRC32C, ChecksumProto.ChecksumType.CRC32C) + .put(DataChecksum.CHECKSUM_NULL, ChecksumProto.ChecksumType.NULL) + .build(); + static BlockConstructionStage fromProto( OpWriteBlockProto.BlockConstructionStage stage) { return BlockConstructionStage.valueOf(BlockConstructionStage.class, @@ -49,6 +67,28 @@ abstract class DataTransferProtoUtil { stage.name()); } + public static ChecksumProto toProto(DataChecksum checksum) { + ChecksumType type = checksumTypeMap.get(checksum.getChecksumType()); + if (type == null) { + throw new IllegalArgumentException( + "Can't convert checksum to protobuf: " + checksum); + } + + return ChecksumProto.newBuilder() + .setBytesPerChecksum(checksum.getBytesPerChecksum()) + .setType(type) + .build(); + } + + public static DataChecksum fromProto(ChecksumProto proto) { + if (proto == null) return null; + + int bytesPerChecksum = proto.getBytesPerChecksum(); + int type = checksumTypeMap.inverse().get(proto.getType()); + + return DataChecksum.newDataChecksum(type, bytesPerChecksum); + } + static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk, String client, Token<BlockTokenIdentifier> blockToken) { ClientOperationHeaderProto header = Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1196458&r1=1196457&r2=1196458&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Wed Nov 2 05:34:31 2011 @@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocol.D import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; /** * Transfer data to/from datanode using a streaming protocol. @@ -84,7 +85,8 @@ public interface DataTransferProtocol { final int pipelineSize, final long minBytesRcvd, final long maxBytesRcvd, - final long latestGenerationStamp) throws IOException; + final long latestGenerationStamp, + final DataChecksum requestedChecksum) throws IOException; /** * Transfer a block to another datanode. Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1196458&r1=1196457&r2=1196458&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Wed Nov 2 05:34:31 2011 @@ -103,7 +103,8 @@ public abstract class Receiver implement fromProto(proto.getStage()), proto.getPipelineSize(), proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(), - proto.getLatestGenerationStamp()); + proto.getLatestGenerationStamp(), + fromProto(proto.getRequestedChecksum())); } /** Receive {@link Op#TRANSFER_BLOCK} */ Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1196458&r1=1196457&r2=1196458&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Wed Nov 2 05:34:31 2011 @@ -29,6 +29,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; @@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; import com.google.protobuf.Message; @@ -93,10 +95,14 @@ public class Sender implements DataTrans final int pipelineSize, final long minBytesRcvd, final long maxBytesRcvd, - final long latestGenerationStamp) throws IOException { + final long latestGenerationStamp, + DataChecksum requestedChecksum) throws IOException { ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader( blk, clientName, blockToken); + ChecksumProto checksumProto = + DataTransferProtoUtil.toProto(requestedChecksum); + OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder() .setHeader(header) .addAllTargets(toProtos(targets, 1)) @@ -104,7 +110,8 @@ public class Sender implements DataTrans .setPipelineSize(pipelineSize) .setMinBytesRcvd(minBytesRcvd) .setMaxBytesRcvd(maxBytesRcvd) - .setLatestGenerationStamp(latestGenerationStamp); + .setLatestGenerationStamp(latestGenerationStamp) + .setRequestedChecksum(checksumProto); if (source != null) { proto.setSource(toProto(source));
