Modified: hive/trunk/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java URL: http://svn.apache.org/viewvc/hive/trunk/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java?rev=1578348&r1=1578347&r2=1578348&view=diff ============================================================================== --- hive/trunk/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (original) +++ hive/trunk/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java Mon Mar 17 12:56:10 2014 @@ -66,99 +66,121 @@ import org.apache.thrift.transport.TTran import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; - /** - * Functions that bridge Thrift's SASL transports to Hadoop's - * SASL callback handlers and authentication classes. - */ - public class HadoopThriftAuthBridge20S extends HadoopThriftAuthBridge { - static final Log LOG = LogFactory.getLog(HadoopThriftAuthBridge.class); - - @Override - public Client createClient() { - return new Client(); - } - - @Override - public Client createClientWithConf(String authType) { - Configuration conf = new Configuration(); - conf.set(HADOOP_SECURITY_AUTHENTICATION, authType); - UserGroupInformation.setConfiguration(conf); - return new Client(); - } - - @Override - public Server createServer(String keytabFile, String principalConf) throws TTransportException { - return new Server(keytabFile, principalConf); - } - - /** - * Read and return Hadoop SASL configuration which can be configured using - * "hadoop.rpc.protection" - * @param conf - * @return Hadoop SASL configuration - */ - @Override - public Map<String, String> getHadoopSaslProperties(Configuration conf) { - // Initialize the SaslRpcServer to ensure QOP parameters are read from conf - SaslRpcServer.init(conf); - return SaslRpcServer.SASL_PROPS; - } - - public static class Client extends HadoopThriftAuthBridge.Client { - /** - * Create a client-side SASL transport that wraps an underlying transport. - * - * @param method The authentication method to use. Currently only KERBEROS is - * supported. - * @param serverPrincipal The Kerberos principal of the target server. - * @param underlyingTransport The underlying transport mechanism, usually a TSocket. - * @param saslProps the sasl properties to create the client with - */ - - @Override - public TTransport createClientTransport( - String principalConfig, String host, - String methodStr, String tokenStrForm, TTransport underlyingTransport, - Map<String, String> saslProps) throws IOException { - AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr); - - TTransport saslTransport = null; - switch (method) { - case DIGEST: - Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>(); - t.decodeFromUrlString(tokenStrForm); - saslTransport = new TSaslClientTransport( +/** + * Functions that bridge Thrift's SASL transports to Hadoop's + * SASL callback handlers and authentication classes. + */ +public class HadoopThriftAuthBridge20S extends HadoopThriftAuthBridge { + static final Log LOG = LogFactory.getLog(HadoopThriftAuthBridge.class); + + @Override + public Client createClient() { + return new Client(); + } + + @Override + public Client createClientWithConf(String authType) { + Configuration conf = new Configuration(); + conf.set(HADOOP_SECURITY_AUTHENTICATION, authType); + UserGroupInformation.setConfiguration(conf); + return new Client(); + } + + @Override + public Server createServer(String keytabFile, String principalConf) throws TTransportException { + return new Server(keytabFile, principalConf); + } + + @Override + public String getServerPrincipal(String principalConfig, String host) + throws IOException { + String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host); + String names[] = SaslRpcServer.splitKerberosName(serverPrincipal); + if (names.length != 3) { + throw new IOException( + "Kerberos principal name does NOT have the expected hostname part: " + + serverPrincipal); + } + return serverPrincipal; + } + + @Override + public UserGroupInformation getCurrentUGIWithConf(String authType) + throws IOException { + Configuration conf = new Configuration(); + conf.set(HADOOP_SECURITY_AUTHENTICATION, authType); + UserGroupInformation.setConfiguration(conf); + return UserGroupInformation.getCurrentUser(); + } + + /** + * Read and return Hadoop SASL configuration which can be configured using + * "hadoop.rpc.protection" + * @param conf + * @return Hadoop SASL configuration + */ + @Override + public Map<String, String> getHadoopSaslProperties(Configuration conf) { + // Initialize the SaslRpcServer to ensure QOP parameters are read from conf + SaslRpcServer.init(conf); + return SaslRpcServer.SASL_PROPS; + } + + public static class Client extends HadoopThriftAuthBridge.Client { + /** + * Create a client-side SASL transport that wraps an underlying transport. + * + * @param method The authentication method to use. Currently only KERBEROS is + * supported. + * @param serverPrincipal The Kerberos principal of the target server. + * @param underlyingTransport The underlying transport mechanism, usually a TSocket. + * @param saslProps the sasl properties to create the client with + */ + + @Override + public TTransport createClientTransport( + String principalConfig, String host, + String methodStr, String tokenStrForm, TTransport underlyingTransport, + Map<String, String> saslProps) throws IOException { + AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr); + + TTransport saslTransport = null; + switch (method) { + case DIGEST: + Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>(); + t.decodeFromUrlString(tokenStrForm); + saslTransport = new TSaslClientTransport( method.getMechanismName(), null, null, SaslRpcServer.SASL_DEFAULT_REALM, saslProps, new SaslClientCallbackHandler(t), underlyingTransport); - return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser()); + return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser()); + + case KERBEROS: + String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host); + String names[] = SaslRpcServer.splitKerberosName(serverPrincipal); + if (names.length != 3) { + throw new IOException( + "Kerberos principal name does NOT have the expected hostname part: " + + serverPrincipal); + } + try { + saslTransport = new TSaslClientTransport( + method.getMechanismName(), + null, + names[0], names[1], + saslProps, null, + underlyingTransport); + return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser()); + } catch (SaslException se) { + throw new IOException("Could not instantiate SASL transport", se); + } - case KERBEROS: - String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host); - String names[] = SaslRpcServer.splitKerberosName(serverPrincipal); - if (names.length != 3) { - throw new IOException( - "Kerberos principal name does NOT have the expected hostname part: " - + serverPrincipal); - } - try { - saslTransport = new TSaslClientTransport( - method.getMechanismName(), - null, - names[0], names[1], - saslProps, null, - underlyingTransport); - return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser()); - } catch (SaslException se) { - throw new IOException("Could not instantiate SASL transport", se); - } - - default: - throw new IOException("Unsupported authentication method: " + method); - } - } + default: + throw new IOException("Unsupported authentication method: " + method); + } + } private static class SaslClientCallbackHandler implements CallbackHandler { private final String userName; private final char[] userPassword; @@ -168,8 +190,9 @@ import org.apache.thrift.transport.TTran this.userPassword = encodePassword(token.getPassword()); } + @Override public void handle(Callback[] callbacks) - throws UnsupportedCallbackException { + throws UnsupportedCallbackException { NameCallback nc = null; PasswordCallback pc = null; RealmCallback rc = null; @@ -214,253 +237,254 @@ import org.apache.thrift.transport.TTran static char[] encodePassword(byte[] password) { return new String(Base64.encodeBase64(password)).toCharArray(); - } - } - } - - public static class Server extends HadoopThriftAuthBridge.Server { - final UserGroupInformation realUgi; - DelegationTokenSecretManager secretManager; - private final static long DELEGATION_TOKEN_GC_INTERVAL = 3600000; // 1 hour - //Delegation token related keys - public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY = - "hive.cluster.delegation.key.update-interval"; - public static final long DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT = - 24*60*60*1000; // 1 day - public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY = - "hive.cluster.delegation.token.renew-interval"; - public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = - 24*60*60*1000; // 1 day - public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY = - "hive.cluster.delegation.token.max-lifetime"; - public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = - 7*24*60*60*1000; // 7 days - public static final String DELEGATION_TOKEN_STORE_CLS = - "hive.cluster.delegation.token.store.class"; - public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR = - "hive.cluster.delegation.token.store.zookeeper.connectString"; - public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS = - "hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis"; - public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE = - "hive.cluster.delegation.token.store.zookeeper.znode"; - public static final String DELEGATION_TOKEN_STORE_ZK_ACL = - "hive.cluster.delegation.token.store.zookeeper.acl"; - public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT = - "/hive/cluster/delegation"; - - public Server() throws TTransportException { - try { - realUgi = UserGroupInformation.getCurrentUser(); - } catch (IOException ioe) { - throw new TTransportException(ioe); - } - } - /** - * Create a server with a kerberos keytab/principal. - */ - protected Server(String keytabFile, String principalConf) - throws TTransportException { - if (keytabFile == null || keytabFile.isEmpty()) { - throw new TTransportException("No keytab specified"); - } - if (principalConf == null || principalConf.isEmpty()) { - throw new TTransportException("No principal specified"); - } - - // Login from the keytab - String kerberosName; - try { - kerberosName = - SecurityUtil.getServerPrincipal(principalConf, "0.0.0.0"); - UserGroupInformation.loginUserFromKeytab( - kerberosName, keytabFile); - realUgi = UserGroupInformation.getLoginUser(); - assert realUgi.isFromKeytab(); - } catch (IOException ioe) { - throw new TTransportException(ioe); - } - } - - /** - * Create a TTransportFactory that, upon connection of a client socket, - * negotiates a Kerberized SASL transport. The resulting TTransportFactory - * can be passed as both the input and output transport factory when - * instantiating a TThreadPoolServer, for example. - * - * @param saslProps Map of SASL properties - */ - @Override - public TTransportFactory createTransportFactory(Map<String, String> saslProps) - throws TTransportException { - // Parse out the kerberos principal, host, realm. - String kerberosName = realUgi.getUserName(); - final String names[] = SaslRpcServer.splitKerberosName(kerberosName); - if (names.length != 3) { - throw new TTransportException("Kerberos principal should have 3 parts: " + kerberosName); - } - - TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory(); - transFactory.addServerDefinition( - AuthMethod.KERBEROS.getMechanismName(), - names[0], names[1], // two parts of kerberos principal - saslProps, - new SaslRpcServer.SaslGssCallbackHandler()); - transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(), + } + } + } + + public static class Server extends HadoopThriftAuthBridge.Server { + final UserGroupInformation realUgi; + DelegationTokenSecretManager secretManager; + private final static long DELEGATION_TOKEN_GC_INTERVAL = 3600000; // 1 hour + //Delegation token related keys + public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY = + "hive.cluster.delegation.key.update-interval"; + public static final long DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT = + 24*60*60*1000; // 1 day + public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY = + "hive.cluster.delegation.token.renew-interval"; + public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = + 24*60*60*1000; // 1 day + public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY = + "hive.cluster.delegation.token.max-lifetime"; + public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = + 7*24*60*60*1000; // 7 days + public static final String DELEGATION_TOKEN_STORE_CLS = + "hive.cluster.delegation.token.store.class"; + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR = + "hive.cluster.delegation.token.store.zookeeper.connectString"; + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS = + "hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis"; + public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE = + "hive.cluster.delegation.token.store.zookeeper.znode"; + public static final String DELEGATION_TOKEN_STORE_ZK_ACL = + "hive.cluster.delegation.token.store.zookeeper.acl"; + public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT = + "/hive/cluster/delegation"; + + public Server() throws TTransportException { + try { + realUgi = UserGroupInformation.getCurrentUser(); + } catch (IOException ioe) { + throw new TTransportException(ioe); + } + } + /** + * Create a server with a kerberos keytab/principal. + */ + protected Server(String keytabFile, String principalConf) + throws TTransportException { + if (keytabFile == null || keytabFile.isEmpty()) { + throw new TTransportException("No keytab specified"); + } + if (principalConf == null || principalConf.isEmpty()) { + throw new TTransportException("No principal specified"); + } + + // Login from the keytab + String kerberosName; + try { + kerberosName = + SecurityUtil.getServerPrincipal(principalConf, "0.0.0.0"); + UserGroupInformation.loginUserFromKeytab( + kerberosName, keytabFile); + realUgi = UserGroupInformation.getLoginUser(); + assert realUgi.isFromKeytab(); + } catch (IOException ioe) { + throw new TTransportException(ioe); + } + } + + /** + * Create a TTransportFactory that, upon connection of a client socket, + * negotiates a Kerberized SASL transport. The resulting TTransportFactory + * can be passed as both the input and output transport factory when + * instantiating a TThreadPoolServer, for example. + * + * @param saslProps Map of SASL properties + */ + @Override + public TTransportFactory createTransportFactory(Map<String, String> saslProps) + throws TTransportException { + // Parse out the kerberos principal, host, realm. + String kerberosName = realUgi.getUserName(); + final String names[] = SaslRpcServer.splitKerberosName(kerberosName); + if (names.length != 3) { + throw new TTransportException("Kerberos principal should have 3 parts: " + kerberosName); + } + + TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory(); + transFactory.addServerDefinition( + AuthMethod.KERBEROS.getMechanismName(), + names[0], names[1], // two parts of kerberos principal + saslProps, + new SaslRpcServer.SaslGssCallbackHandler()); + transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(), null, SaslRpcServer.SASL_DEFAULT_REALM, saslProps, new SaslDigestCallbackHandler(secretManager)); - return new TUGIAssumingTransportFactory(transFactory, realUgi); - } + return new TUGIAssumingTransportFactory(transFactory, realUgi); + } + + /** + * Wrap a TProcessor in such a way that, before processing any RPC, it + * assumes the UserGroupInformation of the user authenticated by + * the SASL transport. + */ + @Override + public TProcessor wrapProcessor(TProcessor processor) { + return new TUGIAssumingProcessor(processor, secretManager, true); + } - /** - * Wrap a TProcessor in such a way that, before processing any RPC, it - * assumes the UserGroupInformation of the user authenticated by - * the SASL transport. - */ - @Override - public TProcessor wrapProcessor(TProcessor processor) { - return new TUGIAssumingProcessor(processor, secretManager, true); - } - - /** - * Wrap a TProcessor to capture the client information like connecting userid, ip etc - */ - @Override - public TProcessor wrapNonAssumingProcessor(TProcessor processor) { + /** + * Wrap a TProcessor to capture the client information like connecting userid, ip etc + */ + @Override + public TProcessor wrapNonAssumingProcessor(TProcessor processor) { return new TUGIAssumingProcessor(processor, secretManager, false); - } + } protected DelegationTokenStore getTokenStore(Configuration conf) throws IOException { - String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, ""); - if (StringUtils.isBlank(tokenStoreClassName)) { - return new MemoryTokenStore(); - } - try { + String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, ""); + if (StringUtils.isBlank(tokenStoreClassName)) { + return new MemoryTokenStore(); + } + try { Class<? extends DelegationTokenStore> storeClass = Class .forName(tokenStoreClassName).asSubclass( DelegationTokenStore.class); return ReflectionUtils.newInstance(storeClass, conf); - } catch (ClassNotFoundException e) { + } catch (ClassNotFoundException e) { throw new IOException("Error initializing delegation token store: " + tokenStoreClassName, e); - } - } + } + } + + @Override + public void startDelegationTokenSecretManager(Configuration conf, Object hms) + throws IOException{ + long secretKeyInterval = + conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY, + DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); + long tokenMaxLifetime = + conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY, + DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); + long tokenRenewInterval = + conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY, + DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); + + DelegationTokenStore dts = getTokenStore(conf); + dts.setStore(hms); + secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval, + tokenMaxLifetime, + tokenRenewInterval, + DELEGATION_TOKEN_GC_INTERVAL, dts); + secretManager.startThreads(); + } + + @Override + public String getDelegationToken(final String owner, final String renewer) + throws IOException, InterruptedException { + if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) { + throw new AuthorizationException( + "Delegation Token can be issued only with kerberos authentication. " + + "Current AuthenticationMethod: " + authenticationMethod.get() + ); + } + //if the user asking the token is same as the 'owner' then don't do + //any proxy authorization checks. For cases like oozie, where it gets + //a delegation token for another user, we need to make sure oozie is + //authorized to get a delegation token. + //Do all checks on short names + UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); + UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner); + if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) { + //in the case of proxy users, the getCurrentUser will return the + //real user (for e.g. oozie) due to the doAs that happened just before the + //server started executing the method getDelegationToken in the MetaStore + ownerUgi = UserGroupInformation.createProxyUser(owner, + UserGroupInformation.getCurrentUser()); + InetAddress remoteAddr = getRemoteAddress(); + ProxyUsers.authorize(ownerUgi,remoteAddr.getHostAddress(), null); + } + return ownerUgi.doAs(new PrivilegedExceptionAction<String>() { + @Override + public String run() throws IOException { + return secretManager.getDelegationToken(renewer); + } + }); + } + + @Override + public String getDelegationTokenWithService(String owner, String renewer, String service) + throws IOException, InterruptedException { + String token = getDelegationToken(owner, renewer); + return ShimLoader.getHadoopShims().addServiceToToken(token, service); + } + + @Override + public long renewDelegationToken(String tokenStrForm) throws IOException { + if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) { + throw new AuthorizationException( + "Delegation Token can be issued only with kerberos authentication. " + + "Current AuthenticationMethod: " + authenticationMethod.get() + ); + } + return secretManager.renewDelegationToken(tokenStrForm); + } + + @Override + public String getUserFromToken(String tokenStr) throws IOException { + return secretManager.getUserFromToken(tokenStr); + } + + @Override + public void cancelDelegationToken(String tokenStrForm) throws IOException { + secretManager.cancelDelegationToken(tokenStrForm); + } + + final static ThreadLocal<InetAddress> remoteAddress = + new ThreadLocal<InetAddress>() { + @Override + protected synchronized InetAddress initialValue() { + return null; + } + }; + + @Override + public InetAddress getRemoteAddress() { + return remoteAddress.get(); + } + + final static ThreadLocal<AuthenticationMethod> authenticationMethod = + new ThreadLocal<AuthenticationMethod>() { + @Override + protected synchronized AuthenticationMethod initialValue() { + return AuthenticationMethod.TOKEN; + } + }; + + private static ThreadLocal<String> remoteUser = new ThreadLocal<String> () { + @Override + protected synchronized String initialValue() { + return null; + } + }; - @Override - public void startDelegationTokenSecretManager(Configuration conf, Object hms) - throws IOException{ - long secretKeyInterval = - conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY, - DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); - long tokenMaxLifetime = - conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY, - DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); - long tokenRenewInterval = - conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY, - DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); - - DelegationTokenStore dts = getTokenStore(conf); - dts.setStore(hms); - secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval, - tokenMaxLifetime, - tokenRenewInterval, - DELEGATION_TOKEN_GC_INTERVAL, dts); - secretManager.startThreads(); - } - - @Override - public String getDelegationToken(final String owner, final String renewer) - throws IOException, InterruptedException { - if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) { - throw new AuthorizationException( - "Delegation Token can be issued only with kerberos authentication. " + - "Current AuthenticationMethod: " + authenticationMethod.get() - ); - } - //if the user asking the token is same as the 'owner' then don't do - //any proxy authorization checks. For cases like oozie, where it gets - //a delegation token for another user, we need to make sure oozie is - //authorized to get a delegation token. - //Do all checks on short names - UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); - UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner); - if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) { - //in the case of proxy users, the getCurrentUser will return the - //real user (for e.g. oozie) due to the doAs that happened just before the - //server started executing the method getDelegationToken in the MetaStore - ownerUgi = UserGroupInformation.createProxyUser(owner, - UserGroupInformation.getCurrentUser()); - InetAddress remoteAddr = getRemoteAddress(); - ProxyUsers.authorize(ownerUgi,remoteAddr.getHostAddress(), null); - } - return ownerUgi.doAs(new PrivilegedExceptionAction<String>() { - public String run() throws IOException { - return secretManager.getDelegationToken(renewer); - } - }); - } - - @Override - public String getDelegationTokenWithService(String owner, String renewer, String service) - throws IOException, InterruptedException { - String token = getDelegationToken(owner, renewer); - return ShimLoader.getHadoopShims().addServiceToToken(token, service); - } - - @Override - public long renewDelegationToken(String tokenStrForm) throws IOException { - if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) { - throw new AuthorizationException( - "Delegation Token can be issued only with kerberos authentication. " + - "Current AuthenticationMethod: " + authenticationMethod.get() - ); - } - return secretManager.renewDelegationToken(tokenStrForm); - } - - @Override - public String getUserFromToken(String tokenStr) throws IOException { - return secretManager.getUserFromToken(tokenStr); - } - - @Override - public void cancelDelegationToken(String tokenStrForm) throws IOException { - secretManager.cancelDelegationToken(tokenStrForm); - } - - final static ThreadLocal<InetAddress> remoteAddress = - new ThreadLocal<InetAddress>() { - @Override - protected synchronized InetAddress initialValue() { - return null; - } - }; - - @Override - public InetAddress getRemoteAddress() { - return remoteAddress.get(); - } - - final static ThreadLocal<AuthenticationMethod> authenticationMethod = - new ThreadLocal<AuthenticationMethod>() { - @Override - protected synchronized AuthenticationMethod initialValue() { - return AuthenticationMethod.TOKEN; - } - }; - - private static ThreadLocal<String> remoteUser = new ThreadLocal<String> () { - @Override - protected synchronized String initialValue() { - return null; - } - }; - - @Override - public String getRemoteUser() { - return remoteUser.get(); - } + @Override + public String getRemoteUser() { + return remoteUser.get(); + } /** CallbackHandler for SASL DIGEST-MD5 mechanism */ // This code is pretty much completely based on Hadoop's @@ -501,12 +525,12 @@ import org.apache.thrift.transport.TTran continue; // realm is ignored } else { throw new UnsupportedCallbackException(callback, - "Unrecognized SASL DIGEST-MD5 Callback"); + "Unrecognized SASL DIGEST-MD5 Callback"); } } if (pc != null) { DelegationTokenIdentifier tokenIdentifier = SaslRpcServer. - getIdentifier(nc.getDefaultName(), secretManager); + getIdentifier(nc.getDefaultName(), secretManager); char[] password = getPassword(tokenIdentifier); if (LOG.isDebugEnabled()) { @@ -526,7 +550,7 @@ import org.apache.thrift.transport.TTran if (ac.isAuthorized()) { if (LOG.isDebugEnabled()) { String username = - SaslRpcServer.getIdentifier(authzid, secretManager).getUser().getUserName(); + SaslRpcServer.getIdentifier(authzid, secretManager).getUser().getUserName(); LOG.debug("SASL server DIGEST-MD5 callback: setting " + "canonicalized client ID: " + username); } @@ -534,117 +558,120 @@ import org.apache.thrift.transport.TTran } } } - } + } - /** - * Processor that pulls the SaslServer object out of the transport, and - * assumes the remote user's UGI before calling through to the original - * processor. - * - * This is used on the server side to set the UGI for each specific call. - */ - protected class TUGIAssumingProcessor implements TProcessor { - final TProcessor wrapped; - DelegationTokenSecretManager secretManager; - boolean useProxy; - TUGIAssumingProcessor(TProcessor wrapped, DelegationTokenSecretManager secretManager, - boolean useProxy) { - this.wrapped = wrapped; - this.secretManager = secretManager; - this.useProxy = useProxy; - } - - public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException { - TTransport trans = inProt.getTransport(); - if (!(trans instanceof TSaslServerTransport)) { - throw new TException("Unexpected non-SASL transport " + trans.getClass()); - } - TSaslServerTransport saslTrans = (TSaslServerTransport)trans; - SaslServer saslServer = saslTrans.getSaslServer(); - String authId = saslServer.getAuthorizationID(); - authenticationMethod.set(AuthenticationMethod.KERBEROS); - LOG.debug("AUTH ID ======>" + authId); - String endUser = authId; - - if(saslServer.getMechanismName().equals("DIGEST-MD5")) { - try { - TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authId, - secretManager); - endUser = tokenId.getUser().getUserName(); - authenticationMethod.set(AuthenticationMethod.TOKEN); - } catch (InvalidToken e) { - throw new TException(e.getMessage()); - } - } - Socket socket = ((TSocket)(saslTrans.getUnderlyingTransport())).getSocket(); - remoteAddress.set(socket.getInetAddress()); - UserGroupInformation clientUgi = null; - try { - if (useProxy) { - clientUgi = UserGroupInformation.createProxyUser( - endUser, UserGroupInformation.getLoginUser()); - remoteUser.set(clientUgi.getShortUserName()); - return clientUgi.doAs(new PrivilegedExceptionAction<Boolean>() { - public Boolean run() { - try { - return wrapped.process(inProt, outProt); - } catch (TException te) { - throw new RuntimeException(te); - } - } - }); - } else { - remoteUser.set(endUser); - return wrapped.process(inProt, outProt); - } - } catch (RuntimeException rte) { - if (rte.getCause() instanceof TException) { - throw (TException)rte.getCause(); - } - throw rte; - } catch (InterruptedException ie) { - throw new RuntimeException(ie); // unexpected! - } catch (IOException ioe) { - throw new RuntimeException(ioe); // unexpected! - } - finally { - if (clientUgi != null) { - try { FileSystem.closeAllForUGI(clientUgi); } - catch(IOException exception) { - LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception); + /** + * Processor that pulls the SaslServer object out of the transport, and + * assumes the remote user's UGI before calling through to the original + * processor. + * + * This is used on the server side to set the UGI for each specific call. + */ + protected class TUGIAssumingProcessor implements TProcessor { + final TProcessor wrapped; + DelegationTokenSecretManager secretManager; + boolean useProxy; + TUGIAssumingProcessor(TProcessor wrapped, DelegationTokenSecretManager secretManager, + boolean useProxy) { + this.wrapped = wrapped; + this.secretManager = secretManager; + this.useProxy = useProxy; + } + + @Override + public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException { + TTransport trans = inProt.getTransport(); + if (!(trans instanceof TSaslServerTransport)) { + throw new TException("Unexpected non-SASL transport " + trans.getClass()); + } + TSaslServerTransport saslTrans = (TSaslServerTransport)trans; + SaslServer saslServer = saslTrans.getSaslServer(); + String authId = saslServer.getAuthorizationID(); + authenticationMethod.set(AuthenticationMethod.KERBEROS); + LOG.debug("AUTH ID ======>" + authId); + String endUser = authId; + + if(saslServer.getMechanismName().equals("DIGEST-MD5")) { + try { + TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authId, + secretManager); + endUser = tokenId.getUser().getUserName(); + authenticationMethod.set(AuthenticationMethod.TOKEN); + } catch (InvalidToken e) { + throw new TException(e.getMessage()); + } + } + Socket socket = ((TSocket)(saslTrans.getUnderlyingTransport())).getSocket(); + remoteAddress.set(socket.getInetAddress()); + UserGroupInformation clientUgi = null; + try { + if (useProxy) { + clientUgi = UserGroupInformation.createProxyUser( + endUser, UserGroupInformation.getLoginUser()); + remoteUser.set(clientUgi.getShortUserName()); + return clientUgi.doAs(new PrivilegedExceptionAction<Boolean>() { + @Override + public Boolean run() { + try { + return wrapped.process(inProt, outProt); + } catch (TException te) { + throw new RuntimeException(te); + } } + }); + } else { + remoteUser.set(endUser); + return wrapped.process(inProt, outProt); + } + } catch (RuntimeException rte) { + if (rte.getCause() instanceof TException) { + throw (TException)rte.getCause(); + } + throw rte; + } catch (InterruptedException ie) { + throw new RuntimeException(ie); // unexpected! + } catch (IOException ioe) { + throw new RuntimeException(ioe); // unexpected! + } + finally { + if (clientUgi != null) { + try { FileSystem.closeAllForUGI(clientUgi); } + catch(IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception); + } } - } - } - } + } + } + } /** - * A TransportFactory that wraps another one, but assumes a specified UGI - * before calling through. - * - * This is used on the server side to assume the server's Principal when accepting - * clients. - */ - static class TUGIAssumingTransportFactory extends TTransportFactory { - private final UserGroupInformation ugi; - private final TTransportFactory wrapped; - - public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) { - assert wrapped != null; - assert ugi != null; - - this.wrapped = wrapped; - this.ugi = ugi; - } - - @Override - public TTransport getTransport(final TTransport trans) { - return ugi.doAs(new PrivilegedAction<TTransport>() { - public TTransport run() { - return wrapped.getTransport(trans); - } - }); - } - } - } - } + * A TransportFactory that wraps another one, but assumes a specified UGI + * before calling through. + * + * This is used on the server side to assume the server's Principal when accepting + * clients. + */ + static class TUGIAssumingTransportFactory extends TTransportFactory { + private final UserGroupInformation ugi; + private final TTransportFactory wrapped; + + public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) { + assert wrapped != null; + assert ugi != null; + + this.wrapped = wrapped; + this.ugi = ugi; + } + + @Override + public TTransport getTransport(final TTransport trans) { + return ugi.doAs(new PrivilegedAction<TTransport>() { + @Override + public TTransport run() { + return wrapped.getTransport(trans); + } + }); + } + } + } +}
Modified: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1578348&r1=1578347&r2=1578348&view=diff ============================================================================== --- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original) +++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Mon Mar 17 12:56:10 2014 @@ -43,7 +43,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -80,15 +79,15 @@ public interface HadoopShims { * @return TaskAttempt Log Url */ String getTaskAttemptLogUrl(JobConf conf, - String taskTrackerHttpAddress, - String taskAttemptId) - throws MalformedURLException; + String taskTrackerHttpAddress, + String taskAttemptId) + throws MalformedURLException; /** * Returns a shim to wrap MiniMrCluster */ public MiniMrShim getMiniMrCluster(Configuration conf, int numberOfTaskTrackers, - String nameNode, int numDir) throws IOException; + String nameNode, int numDir) throws IOException; /** * Shim for MiniMrCluster @@ -125,7 +124,7 @@ public interface HadoopShims { String archiveName) throws Exception; public URI getHarUri(URI original, URI base, URI originalBase) - throws URISyntaxException; + throws URISyntaxException; /** * Hive uses side effect files exclusively for it's output. It also manages * the setup/cleanup/commit of output from the hive client. As a result it does @@ -165,7 +164,7 @@ public interface HadoopShims { * @throws InterruptedException */ public <T> T doAs(UserGroupInformation ugi, PrivilegedExceptionAction<T> pvea) throws - IOException, InterruptedException; + IOException, InterruptedException; /** * Once a delegation token is stored in a file, the location is specified @@ -188,13 +187,13 @@ public interface HadoopShims { /** - * Used by metastore server to creates UGI object for a remote user. + * Used to creates UGI object for a remote user. * @param userName remote User Name * @param groupNames group names associated with remote user name * @return UGI created for the remote user. */ - public UserGroupInformation createRemoteUser(String userName, List<String> groupNames); + /** * Get the short name corresponding to the subject in the passed UGI * @@ -240,7 +239,7 @@ public interface HadoopShims { * @throws IOException */ public void setTokenStr(UserGroupInformation ugi, String tokenStr, String tokenService) - throws IOException; + throws IOException; /** * Add given service to the string format token @@ -250,7 +249,7 @@ public interface HadoopShims { * @throws IOException */ public String addServiceToToken(String tokenStr, String tokenService) - throws IOException; + throws IOException; enum JobTrackerState { INITIALIZING, RUNNING }; @@ -331,7 +330,7 @@ public interface HadoopShims { * @throws IOException */ public boolean moveToAppropriateTrash(FileSystem fs, Path path, Configuration conf) - throws IOException; + throws IOException; /** * Get the default block size for the path. FileSystem alone is not sufficient to @@ -382,6 +381,7 @@ public interface HadoopShims { public interface InputSplitShim extends InputSplit { JobConf getJob(); + @Override long getLength(); /** Returns an array containing the startoffsets of the files in the split. */ @@ -406,14 +406,18 @@ public interface HadoopShims { Path[] getPaths(); /** Returns all the Paths where this input-split resides. */ + @Override String[] getLocations() throws IOException; void shrinkSplit(long length); + @Override String toString(); + @Override void readFields(DataInput in) throws IOException; + @Override void write(DataOutput out) throws IOException; } @@ -445,7 +449,7 @@ public interface HadoopShims { * @throws IOException */ Iterator<FileStatus> listLocatedStatus(FileSystem fs, Path path, - PathFilter filter) throws IOException; + PathFilter filter) throws IOException; /** * For file status returned by listLocatedStatus, convert them into a list @@ -456,7 +460,7 @@ public interface HadoopShims { * @throws IOException */ BlockLocation[] getLocations(FileSystem fs, - FileStatus status) throws IOException; + FileStatus status) throws IOException; public HCatHadoopShims getHCatShim(); public interface HCatHadoopShims { @@ -468,10 +472,10 @@ public interface HadoopShims { public TaskAttemptID createTaskAttemptID(); public org.apache.hadoop.mapreduce.TaskAttemptContext createTaskAttemptContext(Configuration conf, - TaskAttemptID taskId); + TaskAttemptID taskId); public org.apache.hadoop.mapred.TaskAttemptContext createTaskAttemptContext(JobConf conf, - org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable); + org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable); public JobContext createJobContext(Configuration conf, JobID jobId); @@ -603,7 +607,7 @@ public interface HadoopShims { } public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec); - + /** * Get configuration from JobContext */ Modified: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java?rev=1578348&r1=1578347&r2=1578348&view=diff ============================================================================== --- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java (original) +++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java Mon Mar 17 12:56:10 2014 @@ -16,39 +16,53 @@ * limitations under the License. */ - package org.apache.hadoop.hive.thrift; +package org.apache.hadoop.hive.thrift; - import java.io.IOException; +import java.io.IOException; import java.net.InetAddress; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.thrift.TProcessor; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; - /** - * This class is only overridden by the secure hadoop shim. It allows - * the Thrift SASL support to bridge to Hadoop's UserGroupInformation - * & DelegationToken infrastructure. - */ - public class HadoopThriftAuthBridge { - public Client createClient() { - throw new UnsupportedOperationException( - "The current version of Hadoop does not support Authentication"); - } - - public Client createClientWithConf(String authType) { - throw new UnsupportedOperationException( - "The current version of Hadoop does not support Authentication"); - } - - public Server createServer(String keytabFile, String principalConf) - throws TTransportException { - throw new UnsupportedOperationException( - "The current version of Hadoop does not support Authentication"); - } +/** + * This class is only overridden by the secure hadoop shim. It allows + * the Thrift SASL support to bridge to Hadoop's UserGroupInformation + * & DelegationToken infrastructure. + */ +public class HadoopThriftAuthBridge { + public Client createClient() { + throw new UnsupportedOperationException( + "The current version of Hadoop does not support Authentication"); + } + + public Client createClientWithConf(String authType) { + throw new UnsupportedOperationException( + "The current version of Hadoop does not support Authentication"); + } + + public UserGroupInformation getCurrentUGIWithConf(String authType) + throws IOException { + throw new UnsupportedOperationException( + "The current version of Hadoop does not support Authentication"); + } + + + public String getServerPrincipal(String principalConfig, String host) + throws IOException { + throw new UnsupportedOperationException( + "The current version of Hadoop does not support Authentication"); + } + + public Server createServer(String keytabFile, String principalConf) + throws TTransportException { + throw new UnsupportedOperationException( + "The current version of Hadoop does not support Authentication"); + } /** @@ -58,47 +72,47 @@ import org.apache.thrift.transport.TTran * @param conf * @return Hadoop SASL configuration */ - public Map<String, String> getHadoopSaslProperties(Configuration conf) { - throw new UnsupportedOperationException( - "The current version of Hadoop does not support Authentication"); - } - - public static abstract class Client { - /** - * - * @param principalConfig In the case of Kerberos authentication this will - * be the kerberos principal name, for DIGEST-MD5 (delegation token) based - * authentication this will be null - * @param host The metastore server host name - * @param methodStr "KERBEROS" or "DIGEST" - * @param tokenStrForm This is url encoded string form of - * org.apache.hadoop.security.token. - * @param underlyingTransport the underlying transport - * @return the transport - * @throws IOException - */ - public abstract TTransport createClientTransport( - String principalConfig, String host, - String methodStr, String tokenStrForm, TTransport underlyingTransport, - Map<String, String> saslProps) - throws IOException; - } - - public static abstract class Server { - public abstract TTransportFactory createTransportFactory(Map<String, String> saslProps) throws TTransportException; - public abstract TProcessor wrapProcessor(TProcessor processor); - public abstract TProcessor wrapNonAssumingProcessor(TProcessor processor); - public abstract InetAddress getRemoteAddress(); - public abstract void startDelegationTokenSecretManager(Configuration conf, - Object hmsHandler) throws IOException; - public abstract String getDelegationToken(String owner, String renewer) - throws IOException, InterruptedException; - public abstract String getDelegationTokenWithService(String owner, String renewer, String service) - throws IOException, InterruptedException; - public abstract String getRemoteUser(); - public abstract long renewDelegationToken(String tokenStrForm) throws IOException; - public abstract void cancelDelegationToken(String tokenStrForm) throws IOException; - public abstract String getUserFromToken(String tokenStr) throws IOException; - } - } + public Map<String, String> getHadoopSaslProperties(Configuration conf) { + throw new UnsupportedOperationException( + "The current version of Hadoop does not support Authentication"); + } + + public static abstract class Client { + /** + * + * @param principalConfig In the case of Kerberos authentication this will + * be the kerberos principal name, for DIGEST-MD5 (delegation token) based + * authentication this will be null + * @param host The metastore server host name + * @param methodStr "KERBEROS" or "DIGEST" + * @param tokenStrForm This is url encoded string form of + * org.apache.hadoop.security.token. + * @param underlyingTransport the underlying transport + * @return the transport + * @throws IOException + */ + public abstract TTransport createClientTransport( + String principalConfig, String host, + String methodStr, String tokenStrForm, TTransport underlyingTransport, + Map<String, String> saslProps) + throws IOException; + } + + public static abstract class Server { + public abstract TTransportFactory createTransportFactory(Map<String, String> saslProps) throws TTransportException; + public abstract TProcessor wrapProcessor(TProcessor processor); + public abstract TProcessor wrapNonAssumingProcessor(TProcessor processor); + public abstract InetAddress getRemoteAddress(); + public abstract void startDelegationTokenSecretManager(Configuration conf, + Object hmsHandler) throws IOException; + public abstract String getDelegationToken(String owner, String renewer) + throws IOException, InterruptedException; + public abstract String getDelegationTokenWithService(String owner, String renewer, String service) + throws IOException, InterruptedException; + public abstract String getRemoteUser(); + public abstract long renewDelegationToken(String tokenStrForm) throws IOException; + public abstract void cancelDelegationToken(String tokenStrForm) throws IOException; + public abstract String getUserFromToken(String tokenStr) throws IOException; + } +}
