Repository: tez Updated Branches: refs/heads/branch-0.7 5c1b91064 -> db9864c33
TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster. (Prakash Ramachandran via hitesh) (cherry picked from commit f0b9d7ec7472d6c4b932c249f6106c5b6ca85b88) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/db9864c3 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/db9864c3 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/db9864c3 Branch: refs/heads/branch-0.7 Commit: db9864c33d5f5a3ffb60b8d03089014235d3133c Parents: 5c1b910 Author: Hitesh Shah <[email protected]> Authored: Mon May 11 11:44:53 2015 -0700 Committer: Hitesh Shah <[email protected]> Committed: Mon May 11 11:45:40 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../tez/dag/api/client/DAGClientImpl.java | 5 - .../dag/api/client/DAGClientTimelineImpl.java | 187 +++++++++++++++++-- .../tez/dag/api/client/TestATSHttpClient.java | 2 +- 4 files changed, 171 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/db9864c3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index edc2ba6..a154ec5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES Default max limit increased. Should not affect existing users. ALL CHANGES: + TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster. TEZ-2435. Add public key to KEYS TEZ-2421. Deadlock in AM because attempt and vertex locking each other out TEZ-2426. Ensure the eventRouter thread completes before switching to a new task and thread safety fixes in IPOContexts. @@ -181,6 +182,7 @@ Release 0.6.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster. TEZ-2396. pig-tez-tfile-parser pom is hard coded to depend on 0.6.0-SNAPSHOT version. TEZ-2237. Valid events should be sent out when an Output is not started. TEZ-1988. Tez UI: does not work when using file:// in a browser @@ -357,6 +359,7 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: + TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster. TEZ-2369. Add a few unit tests for RootInputInitializerManager. TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException: Invalid event: T_ATTEMPT_KILLED at KILLED. http://git-wip-us.apache.org/repos/asf/tez/blob/db9864c3/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java index de6ede6..b0ad51c 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java @@ -93,11 +93,6 @@ public class DAGClientImpl extends DAGClient { conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED, TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT); - if (UserGroupInformation.isSecurityEnabled()){ - //TODO: enable ATS integration in kerberos secured cluster - see TEZ-1529 - isATSEnabled = false; - } - realClient = new DAGClientRPCImpl(appId, dagId, conf, this.frameworkClient); statusPollInterval = conf.getLong( TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS, http://git-wip-us.apache.org/repos/asf/tez/blob/db9864c3/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java index cc000df..d0b11d6 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java @@ -19,13 +19,20 @@ package org.apache.tez.dag.api.client; import javax.annotation.Nullable; +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLSocketFactory; import javax.ws.rs.core.MediaType; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.lang.reflect.UndeclaredThrowableException; import java.net.HttpURLConnection; +import java.net.URI; import java.net.URL; -import java.net.URLEncoder; +import java.net.URLConnection; +import java.security.GeneralSecurityException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -45,6 +52,14 @@ import com.sun.jersey.api.client.config.DefaultClientConfig; import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; import com.sun.jersey.json.impl.provider.entity.JSONRootElementProvider; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; +import org.apache.hadoop.security.ssl.SSLFactory; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator; +import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator; +import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -81,8 +96,13 @@ public class DAGClientTimelineImpl extends DAGClient { private static Client httpClient = null; private final ApplicationId appId; private final String dagId; - private final TezConfiguration conf; private final FrameworkClient frameworkClient; + private final UserGroupInformation authUgi; + private final String doAsUser; + private final DelegationTokenAuthenticator authenticator; + private final DelegationTokenAuthenticatedURL.Token token; + private final ConnectionConfigurator connConfigurator; + private final static int DEFAULT_SOCKET_TIMEOUT = 30 * 1000; // 30 seconds private Map<String, VertexTaskStats> vertexTaskStatsCache = null; @@ -91,10 +111,9 @@ public class DAGClientTimelineImpl extends DAGClient { public DAGClientTimelineImpl(ApplicationId appId, String dagId, TezConfiguration conf, FrameworkClient frameworkClient) - throws TezException { + throws TezException, IOException { this.appId = appId; this.dagId = dagId; - this.conf = conf; this.frameworkClient = frameworkClient; String scheme; @@ -111,8 +130,28 @@ public class DAGClientTimelineImpl extends DAGClient { } baseUri = Joiner.on("").join(scheme, webAppAddress, ATSConstants.RESOURCE_URI_BASE); - } + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + UserGroupInformation realUgi = ugi.getRealUser(); + if (realUgi != null) { + authUgi = realUgi; + doAsUser = ugi.getShortUserName(); + } else { + authUgi = ugi; + doAsUser = null; + } + + + if (UserGroupInformation.isSecurityEnabled()) { + authenticator = new KerberosDelegationTokenAuthenticator(); + } else { + authenticator = new PseudoDelegationTokenAuthenticator(); + } + + connConfigurator = newConnConfigurator(conf); + authenticator.setConnectionConfigurator(connConfigurator); + token = new DelegationTokenAuthenticatedURL.Token(); + } @Override public String getExecutionContext() { @@ -125,7 +164,13 @@ public class DAGClientTimelineImpl extends DAGClient { try { appReport = frameworkClient.getApplicationReport(appId); } catch (YarnException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("error getting application report", e); + } } catch (IOException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("error getting application report", e); + } } return appReport; } @@ -412,8 +457,13 @@ public class DAGClientTimelineImpl extends DAGClient { .type(MediaType.APPLICATION_JSON_TYPE) .get(ClientResponse.class); - if (response.getClientResponseStatus() != ClientResponse.Status.OK) { - throw new TezException("Failed to get response from YARN Timeline: url: " + url); + final ClientResponse.Status clientResponseStatus = response.getClientResponseStatus(); + if (clientResponseStatus != ClientResponse.Status.OK) { + if (clientResponseStatus == ClientResponse.Status.UNAUTHORIZED) { + httpClient = null; + } + throw new TezException("Failed to get response from YARN Timeline: url: " + url + + " error: " + clientResponseStatus); } return response.getEntity(JSONObject.class); @@ -423,6 +473,8 @@ public class DAGClientTimelineImpl extends DAGClient { throw new TezException("Error accessing content from YARN Timeline - unexpected response", e); } catch (IllegalArgumentException e) { throw new TezException("Error accessing content from YARN Timeline - invalid url", e); + } catch (IOException e) { + throw new TezException("Error getting http client connection", e); } } @@ -460,15 +512,119 @@ public class DAGClientTimelineImpl extends DAGClient { } } - protected Client getHttpClient() { + protected Client getHttpClient() throws IOException, TezException { if (httpClient == null) { - ClientConfig config = new DefaultClientConfig(JSONRootElementProvider.App.class); - HttpURLConnectionFactory urlFactory = new PseudoAuthenticatedURLConnectionFactory(); - httpClient = new Client(new URLConnectionClientHandler(urlFactory), config); + if (UserGroupInformation.isSecurityEnabled()) { + final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + try { + final Token<?> delegationToken = getDelegationToken(currentUser.getUserName()); + currentUser.addToken(delegationToken); + } catch (UndeclaredThrowableException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("exception getting httpclient token", e); + } + } + } + + ClientConfig clientConfig = new DefaultClientConfig(JSONRootElementProvider.App.class); + HttpURLConnectionFactory connectionFactory = new TimelineURLConnectionFactory(); + httpClient = new Client(new URLConnectionClientHandler(connectionFactory), clientConfig); } return httpClient; } + private Token<?> getDelegationToken(final String renewer) throws + IOException, TezException { + authUgi.checkTGTAndReloginFromKeytab(); + try { + return authUgi.doAs(new PrivilegedExceptionAction<Token<?>>() { + @Override + public Token<?> run() throws IOException, AuthenticationException { + try { + URI resURI = URI.create(baseUri); + DelegationTokenAuthenticatedURL authUrl = + new DelegationTokenAuthenticatedURL(authenticator, connConfigurator); + return (Token) authUrl.getDelegationToken(resURI.toURL(), token, renewer, doAsUser); + } catch (IllegalArgumentException e) { + throw new IOException("invalid url " + baseUri, e); + } + } + }); + } catch (InterruptedException e) { + throw new TezException(e); + } + } + + private class TimelineURLConnectionFactory implements HttpURLConnectionFactory { + + @Override + public HttpURLConnection getHttpURLConnection(final URL url) throws IOException { + try { + return new DelegationTokenAuthenticatedURL( + authenticator, connConfigurator).openConnection(url, token, + doAsUser); + } catch (UndeclaredThrowableException e) { + throw new IOException(e.getCause()); + } catch (AuthenticationException ae) { + throw new IOException(ae); + } + } + + } + + private static ConnectionConfigurator newConnConfigurator(Configuration conf) { + try { + return newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf); + } catch (Exception e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot load customized ssl related configuration. " + + "Fallback to system-generic settings.", e); + } + return DEFAULT_TIMEOUT_CONN_CONFIGURATOR; + } + } + + private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR = + new ConnectionConfigurator() { + @Override + public HttpURLConnection configure(HttpURLConnection conn) + throws IOException { + setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT); + return conn; + } + }; + + private static ConnectionConfigurator newSslConnConfigurator(final int timeout, Configuration conf) + throws IOException, GeneralSecurityException { + final SSLFactory factory; + final SSLSocketFactory sf; + final HostnameVerifier hv; + + factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf); + factory.init(); + sf = factory.createSSLSocketFactory(); + hv = factory.getHostnameVerifier(); + + return new ConnectionConfigurator() { + @Override + public HttpURLConnection configure(HttpURLConnection conn) + throws IOException { + if (conn instanceof HttpsURLConnection) { + HttpsURLConnection c = (HttpsURLConnection) conn; + c.setSSLSocketFactory(sf); + c.setHostnameVerifier(hv); + } + setTimeouts(conn, timeout); + return conn; + } + }; + } + + private static void setTimeouts(URLConnection connection, int socketTimeout) { + connection.setConnectTimeout(socketTimeout); + connection.setReadTimeout(socketTimeout); + } + private static final Map<String, DAGStatusStateProto> dagStateProtoMap = Collections.unmodifiableMap(new HashMap<String, DAGStatusStateProto>() {{ put("NEW", DAGStatusStateProto.DAG_SUBMITTED); @@ -498,15 +654,6 @@ public class DAGClientTimelineImpl extends DAGClient { }}); - static class PseudoAuthenticatedURLConnectionFactory implements HttpURLConnectionFactory { - @Override - public HttpURLConnection getHttpURLConnection(URL url) throws IOException { - String tokenString = (url.getQuery() == null ? "?" : "&") + "user.name=" + - URLEncoder.encode(UserGroupInformation.getCurrentUser().getShortUserName(), "UTF8"); - return (HttpURLConnection) (new URL(url.toString() + tokenString)).openConnection(); - } - } - @Override public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions, long timeout) throws IOException, TezException { http://git-wip-us.apache.org/repos/asf/tez/blob/db9864c3/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java index a72b799..aafc28f 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java @@ -53,7 +53,7 @@ public class TestATSHttpClient { } @Test(timeout = 5000) - public void testGetDagStatusThrowsExceptionOnEmptyJson() throws TezException { + public void testGetDagStatusThrowsExceptionOnEmptyJson() throws TezException, IOException { ApplicationId mockAppId = mock(ApplicationId.class); DAGClientTimelineImpl httpClient = new DAGClientTimelineImpl(mockAppId, "EXAMPLE_DAG_ID", new TezConfiguration(), null);
