http://git-wip-us.apache.org/repos/asf/atlas/blob/900bf9eb/client/common/src/main/java/org/apache/atlas/security/SecureClientUtils.java ---------------------------------------------------------------------- diff --git a/client/common/src/main/java/org/apache/atlas/security/SecureClientUtils.java b/client/common/src/main/java/org/apache/atlas/security/SecureClientUtils.java new file mode 100644 index 0000000..d5392b2 --- /dev/null +++ b/client/common/src/main/java/org/apache/atlas/security/SecureClientUtils.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.security; + +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; +import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; +import org.apache.atlas.AtlasException; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.alias.CredentialProviderFactory; +import org.apache.hadoop.security.authentication.client.Authenticator; +import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; +import org.apache.hadoop.security.ssl.SSLFactory; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator; +import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLSocketFactory; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLConnection; +import java.security.GeneralSecurityException; +import java.security.PrivilegedExceptionAction; + +import static org.apache.atlas.security.SecurityProperties.CERT_STORES_CREDENTIAL_PROVIDER_PATH; +import static org.apache.atlas.security.SecurityProperties.CLIENT_AUTH_KEY; +import static org.apache.atlas.security.SecurityProperties.KEYSTORE_FILE_KEY; +import static org.apache.atlas.security.SecurityProperties.TRUSTSTORE_FILE_KEY; + +/** + * + */ +public class SecureClientUtils { + + public final static int DEFAULT_SOCKET_TIMEOUT_IN_MSECS = 1 * 60 * 1000; // 1 minute + private static final Logger LOG = LoggerFactory.getLogger(SecureClientUtils.class); + + + public static URLConnectionClientHandler getClientConnectionHandler(DefaultClientConfig config, + org.apache.commons.configuration.Configuration clientConfig, String doAsUser, + final UserGroupInformation ugi) { + config.getProperties().put(URLConnectionClientHandler.PROPERTY_HTTP_URL_CONNECTION_SET_METHOD_WORKAROUND, true); + Configuration conf = new Configuration(); + conf.addResource(conf.get(SSLFactory.SSL_CLIENT_CONF_KEY, SecurityProperties.SSL_CLIENT_PROPERTIES)); + UserGroupInformation.setConfiguration(conf); + final ConnectionConfigurator connConfigurator = newConnConfigurator(conf); + + Authenticator authenticator = new KerberosDelegationTokenAuthenticator(); + + authenticator.setConnectionConfigurator(connConfigurator); + final DelegationTokenAuthenticator finalAuthenticator = (DelegationTokenAuthenticator) authenticator; + final DelegationTokenAuthenticatedURL.Token token = new DelegationTokenAuthenticatedURL.Token(); + HttpURLConnectionFactory httpURLConnectionFactory = null; + try { + UserGroupInformation ugiToUse = ugi != null ? ugi : UserGroupInformation.getCurrentUser(); + final UserGroupInformation actualUgi = + (ugiToUse.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY) + ? ugiToUse.getRealUser() : ugiToUse; + LOG.info("Real User: {}, is from ticket cache? {}", actualUgi, actualUgi.isLoginTicketBased()); + if (StringUtils.isEmpty(doAsUser)) { + doAsUser = actualUgi.getShortUserName(); + } + LOG.info("doAsUser: {}", doAsUser); + final String finalDoAsUser = doAsUser; + httpURLConnectionFactory = new HttpURLConnectionFactory() { + @Override + public HttpURLConnection getHttpURLConnection(final URL url) throws IOException { + try { + return actualUgi.doAs(new PrivilegedExceptionAction<HttpURLConnection>() { + @Override + public HttpURLConnection run() throws Exception { + try { + return new DelegationTokenAuthenticatedURL(finalAuthenticator, connConfigurator) + .openConnection(url, token, finalDoAsUser); + } catch (Exception e) { + throw new IOException(e); + } + } + }); + } catch (Exception e) { + if (e instanceof IOException) { + throw (IOException) e; + } else { + throw new IOException(e); + } + } + } + }; + } catch (IOException e) { + LOG.warn("Error obtaining user", e); + } + + return new URLConnectionClientHandler(httpURLConnectionFactory); + } + + private final static ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR = new ConnectionConfigurator() { + @Override + public HttpURLConnection configure(HttpURLConnection conn) throws IOException { + setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT_IN_MSECS); + return conn; + } + }; + + private static ConnectionConfigurator newConnConfigurator(Configuration conf) { + try { + return newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT_IN_MSECS, conf); + } catch (Exception e) { + LOG.debug("Cannot load customized ssl related configuration. " + "Fallback to system-generic settings.", e); + return DEFAULT_TIMEOUT_CONN_CONFIGURATOR; + } + } + + private static ConnectionConfigurator newSslConnConfigurator(final int timeout, Configuration conf) + throws IOException, GeneralSecurityException { + final SSLFactory factory; + final SSLSocketFactory sf; + final HostnameVerifier hv; + + factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf); + factory.init(); + sf = factory.createSSLSocketFactory(); + hv = factory.getHostnameVerifier(); + + return new ConnectionConfigurator() { + @Override + public HttpURLConnection configure(HttpURLConnection conn) throws IOException { + if (conn instanceof HttpsURLConnection) { + HttpsURLConnection c = (HttpsURLConnection) conn; + c.setSSLSocketFactory(sf); + c.setHostnameVerifier(hv); + } + setTimeouts(conn, timeout); + return conn; + } + }; + } + + private static void setTimeouts(URLConnection connection, int socketTimeout) { + connection.setConnectTimeout(socketTimeout); + connection.setReadTimeout(socketTimeout); + } + + private static File getSSLClientFile() throws AtlasException { + String confLocation = System.getProperty("atlas.conf"); + File sslDir; + try { + if (confLocation == null) { + String persistDir = null; + URL resource = SecureClientUtils.class.getResource("/"); + if (resource != null) { + persistDir = resource.toURI().getPath(); + } + assert persistDir != null; + sslDir = new File(persistDir); + } else { + sslDir = new File(confLocation); + } + LOG.info("ssl-client.xml will be created in {}", sslDir); + } catch (Exception e) { + throw new AtlasException("Failed to find client configuration directory", e); + } + return new File(sslDir, SecurityProperties.SSL_CLIENT_PROPERTIES); + } + + public static void persistSSLClientConfiguration(org.apache.commons.configuration.Configuration clientConfig) + throws AtlasException, IOException { + //trust settings + Configuration configuration = new Configuration(false); + File sslClientFile = getSSLClientFile(); + if (!sslClientFile.exists()) { + configuration.set("ssl.client.truststore.type", "jks"); + configuration.set("ssl.client.truststore.location", clientConfig.getString(TRUSTSTORE_FILE_KEY)); + if (clientConfig.getBoolean(CLIENT_AUTH_KEY, false)) { + // need to get client key properties + configuration.set("ssl.client.keystore.location", clientConfig.getString(KEYSTORE_FILE_KEY)); + configuration.set("ssl.client.keystore.type", "jks"); + } + // add the configured credential provider + configuration.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, + clientConfig.getString(CERT_STORES_CREDENTIAL_PROVIDER_PATH)); + String hostnameVerifier = clientConfig.getString(SSLFactory.SSL_HOSTNAME_VERIFIER_KEY); + if (hostnameVerifier != null) { + configuration.set(SSLFactory.SSL_HOSTNAME_VERIFIER_KEY, hostnameVerifier); + } + + configuration.writeXml(new FileWriter(sslClientFile)); + } + } + + public static URLConnectionClientHandler getUrlConnectionClientHandler() { + return new URLConnectionClientHandler(new HttpURLConnectionFactory() { + @Override + public HttpURLConnection getHttpURLConnection(URL url) + throws IOException { + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + + if (connection instanceof HttpsURLConnection) { + LOG.debug("Attempting to configure HTTPS connection using client " + + "configuration"); + final SSLFactory factory; + final SSLSocketFactory sf; + final HostnameVerifier hv; + + try { + Configuration conf = new Configuration(); + conf.addResource(conf.get(SSLFactory.SSL_CLIENT_CONF_KEY, SecurityProperties.SSL_CLIENT_PROPERTIES)); + UserGroupInformation.setConfiguration(conf); + + HttpsURLConnection c = (HttpsURLConnection) connection; + factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf); + factory.init(); + sf = factory.createSSLSocketFactory(); + hv = factory.getHostnameVerifier(); + c.setSSLSocketFactory(sf); + c.setHostnameVerifier(hv); + } catch (Exception e) { + LOG.info("Unable to configure HTTPS connection from " + + "configuration. Leveraging JDK properties."); + } + } + return connection; + } + }); + } +}
http://git-wip-us.apache.org/repos/asf/atlas/blob/900bf9eb/client/pom.xml ---------------------------------------------------------------------- diff --git a/client/pom.xml b/client/pom.xml index 2ff0674..7f9f4b8 100755 --- a/client/pom.xml +++ b/client/pom.xml @@ -24,58 +24,28 @@ <groupId>org.apache.atlas</groupId> <version>0.8.2-SNAPSHOT</version> </parent> + + <!-- Sub modules --> + <modules> + <module>common</module> + <module>client-v1</module> + <module>client-v2</module> + </modules> + <artifactId>atlas-client</artifactId> <description>Apache Atlas Client</description> <name>Apache Atlas Client</name> - <packaging>jar</packaging> + <packaging>pom</packaging> <dependencies> <dependency> - <groupId>org.apache.atlas</groupId> - <artifactId>atlas-typesystem</artifactId> - </dependency> - - <!-- supports simple auth handler --> - <dependency> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-annotations</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minikdc</artifactId> - </dependency> - - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-client</artifactId> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> </dependency> - <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.atlas</groupId> - <artifactId>atlas-typesystem</artifactId> - <classifier>tests</classifier> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-all</artifactId> <scope>test</scope> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/atlas/blob/900bf9eb/client/src/main/assembly/all-jar.xml ---------------------------------------------------------------------- diff --git a/client/src/main/assembly/all-jar.xml b/client/src/main/assembly/all-jar.xml new file mode 100644 index 0000000..8bc9eee --- /dev/null +++ b/client/src/main/assembly/all-jar.xml @@ -0,0 +1,35 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<assembly + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> + <id>all-jar</id> + <formats> + <format>jar</format> <!-- the result is a jar file --> + </formats> + + <includeBaseDirectory>false</includeBaseDirectory> <!-- strip the module prefixes --> + + <dependencySets> + <dependencySet> + <unpack>true</unpack> <!-- unpack , then repack the jars --> + <useTransitiveDependencies>false</useTransitiveDependencies> <!-- do not pull in any transitive dependencies --> + </dependencySet> + </dependencySets> +</assembly> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/900bf9eb/client/src/main/java/org/apache/atlas/AtlasAdminClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/AtlasAdminClient.java b/client/src/main/java/org/apache/atlas/AtlasAdminClient.java deleted file mode 100644 index f334f6c..0000000 --- a/client/src/main/java/org/apache/atlas/AtlasAdminClient.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas; - -import org.apache.atlas.model.metrics.AtlasMetrics; -import org.apache.atlas.type.AtlasType; -import org.apache.atlas.utils.AuthenticationUtil; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.commons.configuration.Configuration; - -import java.util.Arrays; - - -/** - * An application that allows users to run admin commands against an Atlas server. - * - * The application uses {@link AtlasClient} to send REST requests to the Atlas server. The details of connections - * and other configuration is specified in the Atlas properties file. - * Exit status of the application will be as follows: - * <li>0: successful execution</li> - * <li>1: error in options used for the application</li> - * <li>-1/255: application error</li> - */ -public class AtlasAdminClient { - - private static final Option STATUS = new Option("status", false, "Get the status of an atlas instance"); - private static final Option STATS = new Option("stats", false, "Get the metrics of an atlas instance"); - private static final Options OPTIONS = new Options(); - - private static final int INVALID_OPTIONS_STATUS = 1; - private static final int PROGRAM_ERROR_STATUS = -1; - - static { - OPTIONS.addOption(STATUS); - OPTIONS.addOption(STATS); - } - - public static void main(String[] args) throws AtlasException, ParseException { - AtlasAdminClient atlasAdminClient = new AtlasAdminClient(); - int result = atlasAdminClient.run(args); - System.exit(result); - } - - private int run(String[] args) throws AtlasException { - CommandLine commandLine = parseCommandLineOptions(args); - Configuration configuration = ApplicationProperties.get(); - String[] atlasServerUri = configuration.getStringArray(AtlasConstants.ATLAS_REST_ADDRESS_KEY); - - if (atlasServerUri == null || atlasServerUri.length == 0) { - atlasServerUri = new String[] { AtlasConstants.DEFAULT_ATLAS_REST_ADDRESS }; - } - - AtlasClient atlasClient = null; - if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) { - String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput(); - atlasClient = new AtlasClient(atlasServerUri, basicAuthUsernamePassword); - } else { - atlasClient = new AtlasClient(atlasServerUri); - } - return handleCommand(commandLine, atlasServerUri, atlasClient); - } - - private int handleCommand(CommandLine commandLine, String[] atlasServerUri, AtlasClient atlasClient) { - int cmdStatus = PROGRAM_ERROR_STATUS; - if (commandLine.hasOption(STATUS.getOpt())) { - try { - System.out.println(atlasClient.getAdminStatus()); - cmdStatus = 0; - } catch (AtlasServiceException e) { - System.err.println("Could not retrieve status of the server at " + Arrays.toString(atlasServerUri)); - printStandardHttpErrorDetails(e); - } - } else if (commandLine.hasOption(STATS.getOpt())) { - try { - AtlasMetrics atlasMetrics = atlasClient.getAtlasMetrics(); - String json = AtlasType.toJson(atlasMetrics); - System.out.println(json); - cmdStatus = 0; - } catch (AtlasServiceException e) { - System.err.println("Could not retrieve metrics of the server at " + Arrays.toString(atlasServerUri)); - printStandardHttpErrorDetails(e); - } - } else { - System.err.println("Unsupported option. Refer to usage for valid options."); - printUsage(INVALID_OPTIONS_STATUS); - } - return cmdStatus; - } - - private void printStandardHttpErrorDetails(AtlasServiceException e) { - System.err.println("Error details: "); - System.err.println("HTTP Status: " + e.getStatus().getStatusCode() + "," - + e.getStatus().getReasonPhrase()); - System.err.println("Exception message: " + e.getMessage()); - } - - private CommandLine parseCommandLineOptions(String[] args) { - if (args.length == 0) { - printUsage(INVALID_OPTIONS_STATUS); - } - CommandLineParser parser = new GnuParser(); - CommandLine commandLine = null; - try { - commandLine = parser.parse(OPTIONS, args); - } catch (ParseException e) { - System.err.println("Could not parse command line options."); - printUsage(INVALID_OPTIONS_STATUS); - } - return commandLine; - } - - private void printUsage(int statusCode) { - HelpFormatter helpFormatter = new HelpFormatter(); - helpFormatter.printHelp("atlas_admin.py", OPTIONS); - System.exit(statusCode); - } - -} http://git-wip-us.apache.org/repos/asf/atlas/blob/900bf9eb/client/src/main/java/org/apache/atlas/AtlasBaseClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/AtlasBaseClient.java b/client/src/main/java/org/apache/atlas/AtlasBaseClient.java deleted file mode 100644 index 602831a..0000000 --- a/client/src/main/java/org/apache/atlas/AtlasBaseClient.java +++ /dev/null @@ -1,620 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas; - -import com.google.common.annotations.VisibleForTesting; -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientHandlerException; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.GenericType; -import com.sun.jersey.api.client.WebResource; -import com.sun.jersey.api.client.config.DefaultClientConfig; -import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter; -import com.sun.jersey.api.json.JSONConfiguration; -import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; -import org.apache.atlas.model.metrics.AtlasMetrics; -import org.apache.atlas.security.SecureClientUtils; -import org.apache.atlas.utils.AuthenticationUtil; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.ws.rs.HttpMethod; -import javax.ws.rs.core.Cookie; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriBuilder; -import java.io.IOException; -import java.net.ConnectException; -import java.util.List; -import java.util.Map; - -import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED; - -public abstract class AtlasBaseClient { - public static final String BASE_URI = "api/atlas/"; - public static final String TYPES = "types"; - public static final String ADMIN_VERSION = "admin/version"; - public static final String ADMIN_STATUS = "admin/status"; - public static final String ADMIN_METRICS = "admin/metrics"; - public static final String HTTP_AUTHENTICATION_ENABLED = "atlas.http.authentication.enabled"; - //Admin operations - public static final APIInfo VERSION = new APIInfo(BASE_URI + ADMIN_VERSION, HttpMethod.GET, Response.Status.OK); - public static final APIInfo STATUS = new APIInfo(BASE_URI + ADMIN_STATUS, HttpMethod.GET, Response.Status.OK); - public static final APIInfo METRICS = new APIInfo(BASE_URI + ADMIN_METRICS, HttpMethod.GET, Response.Status.OK); - static final String JSON_MEDIA_TYPE = MediaType.APPLICATION_JSON + "; charset=UTF-8"; - static final String UNKNOWN_STATUS = "Unknown status"; - static final String ATLAS_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries"; - // Setting the default value based on testing failovers while client code like quickstart is running. - static final int DEFAULT_NUM_RETRIES = 4; - static final String ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms"; - // Setting the default value based on testing failovers while client code like quickstart is running. - // With number of retries, this gives a total time of about 20s for the server to start. - static final int DEFAULT_SLEEP_BETWEEN_RETRIES_MS = 5000; - private static final Logger LOG = LoggerFactory.getLogger(AtlasBaseClient.class); - protected WebResource service; - protected Configuration configuration; - private String basicAuthUser; - private String basicAuthPassword; - private AtlasClientContext atlasClientContext; - private boolean retryEnabled = false; - private Cookie cookie = null; - - protected AtlasBaseClient() { - } - - protected AtlasBaseClient(String[] baseUrl, String[] basicAuthUserNamePassword) { - if (basicAuthUserNamePassword != null) { - if (basicAuthUserNamePassword.length > 0) { - this.basicAuthUser = basicAuthUserNamePassword[0]; - } - if (basicAuthUserNamePassword.length > 1) { - this.basicAuthPassword = basicAuthUserNamePassword[1]; - } - } - - initializeState(baseUrl, null, null); - } - - protected AtlasBaseClient(String... baseUrls) throws AtlasException { - this(getCurrentUGI(), baseUrls); - } - - protected AtlasBaseClient(UserGroupInformation ugi, String[] baseUrls) { - this(ugi, ugi.getShortUserName(), baseUrls); - } - - protected AtlasBaseClient(UserGroupInformation ugi, String doAsUser, String[] baseUrls) { - initializeState(baseUrls, ugi, doAsUser); - } - - protected AtlasBaseClient(String[] baseUrls, Cookie cookie) { - this.cookie = cookie; - initializeState(baseUrls, null, null); - } - - @VisibleForTesting - protected AtlasBaseClient(WebResource service, Configuration configuration) { - this.service = service; - this.configuration = configuration; - } - - @VisibleForTesting - protected AtlasBaseClient(Configuration configuration, String[] baseUrl, String[] basicAuthUserNamePassword) { - if (basicAuthUserNamePassword != null) { - if (basicAuthUserNamePassword.length > 0) { - this.basicAuthUser = basicAuthUserNamePassword[0]; - } - if (basicAuthUserNamePassword.length > 1) { - this.basicAuthPassword = basicAuthUserNamePassword[1]; - } - } - - initializeState(configuration, baseUrl, null, null); - } - - public void setCookie(Cookie cookie) { - this.cookie = cookie; - } - - protected static UserGroupInformation getCurrentUGI() throws AtlasException { - try { - return UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - throw new AtlasException(e); - } - } - - void initializeState(String[] baseUrls, UserGroupInformation ugi, String doAsUser) { - initializeState(getClientProperties(), baseUrls, ugi, doAsUser); - } - - void initializeState(Configuration configuration, String[] baseUrls, UserGroupInformation ugi, String doAsUser) { - this.configuration = configuration; - Client client = getClient(configuration, ugi, doAsUser); - - if ((!AuthenticationUtil.isKerberosAuthenticationEnabled()) && basicAuthUser != null && basicAuthPassword != null) { - final HTTPBasicAuthFilter authFilter = new HTTPBasicAuthFilter(basicAuthUser, basicAuthPassword); - client.addFilter(authFilter); - } - - String activeServiceUrl = determineActiveServiceURL(baseUrls, client); - atlasClientContext = new AtlasClientContext(baseUrls, client, ugi, doAsUser); - service = client.resource(UriBuilder.fromUri(activeServiceUrl).build()); - } - - @VisibleForTesting - protected Client getClient(Configuration configuration, UserGroupInformation ugi, String doAsUser) { - DefaultClientConfig config = new DefaultClientConfig(); - // Enable POJO mapping feature - config.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, Boolean.TRUE); - int readTimeout = configuration.getInt("atlas.client.readTimeoutMSecs", 60000); - int connectTimeout = configuration.getInt("atlas.client.connectTimeoutMSecs", 60000); - if (configuration.getBoolean(TLS_ENABLED, false)) { - // create an SSL properties configuration if one doesn't exist. SSLFactory expects a file, so forced - // to create a - // configuration object, persist it, then subsequently pass in an empty configuration to SSLFactory - try { - SecureClientUtils.persistSSLClientConfiguration(configuration); - } catch (Exception e) { - LOG.info("Error processing client configuration.", e); - } - } - - final URLConnectionClientHandler handler; - - if ((AuthenticationUtil.isKerberosAuthenticationEnabled())) { - handler = SecureClientUtils.getClientConnectionHandler(config, configuration, doAsUser, ugi); - } else { - if (configuration.getBoolean(TLS_ENABLED, false)) { - handler = SecureClientUtils.getUrlConnectionClientHandler(); - } else { - handler = new URLConnectionClientHandler(); - } - } - Client client = new Client(handler, config); - client.setReadTimeout(readTimeout); - client.setConnectTimeout(connectTimeout); - return client; - } - - @VisibleForTesting - protected String determineActiveServiceURL(String[] baseUrls, Client client) { - if (baseUrls.length == 0) { - throw new IllegalArgumentException("Base URLs cannot be null or empty"); - } - final String baseUrl; - AtlasServerEnsemble atlasServerEnsemble = new AtlasServerEnsemble(baseUrls); - if (atlasServerEnsemble.hasSingleInstance()) { - baseUrl = atlasServerEnsemble.firstURL(); - LOG.info("Client has only one service URL, will use that for all actions: {}", baseUrl); - } else { - try { - baseUrl = selectActiveServerAddress(client, atlasServerEnsemble); - } catch (AtlasServiceException e) { - LOG.error("None of the passed URLs are active: {}", atlasServerEnsemble, e); - throw new IllegalArgumentException("None of the passed URLs are active " + atlasServerEnsemble, e); - } - } - return baseUrl; - } - - private String selectActiveServerAddress(Client client, AtlasServerEnsemble serverEnsemble) - throws AtlasServiceException { - List<String> serverInstances = serverEnsemble.getMembers(); - String activeServerAddress = null; - for (String serverInstance : serverInstances) { - LOG.info("Trying with address {}", serverInstance); - activeServerAddress = getAddressIfActive(client, serverInstance); - if (activeServerAddress != null) { - LOG.info("Found service {} as active service.", serverInstance); - break; - } - } - if (activeServerAddress != null) - return activeServerAddress; - else - throw new AtlasServiceException(STATUS, new RuntimeException("Could not find any active instance")); - } - - private String getAddressIfActive(Client client, String serverInstance) { - String activeServerAddress = null; - for (int i = 0; i < getNumberOfRetries(); i++) { - try { - service = client.resource(UriBuilder.fromUri(serverInstance).build()); - String adminStatus = getAdminStatus(); - if (StringUtils.equals(adminStatus, "ACTIVE")) { - activeServerAddress = serverInstance; - break; - } else { - LOG.info("attempt #{}: Service {} - is not active. status={}", (i + 1), serverInstance, adminStatus); - } - } catch (Exception e) { - LOG.error("attempt #{}: Service {} - could not get status", (i + 1), serverInstance, e); - } - sleepBetweenRetries(); - } - return activeServerAddress; - } - - protected Configuration getClientProperties() { - try { - if (configuration == null) { - configuration = ApplicationProperties.get(); - } - } catch (AtlasException e) { - LOG.error("Exception while loading configuration.", e); - } - return configuration; - } - - public boolean isServerReady() throws AtlasServiceException { - WebResource resource = getResource(VERSION.getPath()); - try { - callAPIWithResource(VERSION, resource, null, JSONObject.class); - return true; - } catch (ClientHandlerException che) { - return false; - } catch (AtlasServiceException ase) { - if (ase.getStatus() != null && ase.getStatus().equals(ClientResponse.Status.SERVICE_UNAVAILABLE)) { - LOG.warn("Received SERVICE_UNAVAILABLE, server is not yet ready"); - return false; - } - throw ase; - } - } - - protected WebResource getResource(String path, String... pathParams) { - return getResource(service, path, pathParams); - } - - protected <T> T callAPIWithResource(APIInfo api, WebResource resource, Object requestObject, Class<T> responseType) throws AtlasServiceException { - GenericType<T> genericType = null; - if (responseType != null) { - genericType = new GenericType<>(responseType); - } - return callAPIWithResource(api, resource, requestObject, genericType); - } - - protected <T> T callAPIWithResource(APIInfo api, WebResource resource, Object requestObject, GenericType<T> responseType) throws AtlasServiceException { - ClientResponse clientResponse = null; - int i = 0; - do { - if (LOG.isDebugEnabled()) { - LOG.debug("Calling API [ {} : {} ] {}", api.getMethod(), api.getPath(), requestObject != null ? "<== " + requestObject : ""); - } - - WebResource.Builder requestBuilder = resource.getRequestBuilder(); - - // Set content headers - requestBuilder - .accept(JSON_MEDIA_TYPE) - .type(JSON_MEDIA_TYPE); - - // Set cookie if present - if (cookie != null) { - requestBuilder.cookie(cookie); - } - - clientResponse = requestBuilder.method(api.getMethod(), ClientResponse.class, requestObject); - - if (LOG.isDebugEnabled()) { - LOG.debug("API {} returned status {}", resource.getURI(), clientResponse.getStatus()); - } - - if (clientResponse.getStatus() == api.getExpectedStatus().getStatusCode()) { - if (null == responseType) { - return null; - } - try { - if (responseType.getRawClass() == JSONObject.class) { - String stringEntity = clientResponse.getEntity(String.class); - try { - JSONObject jsonObject = new JSONObject(stringEntity); - LOG.info("Response = {}", jsonObject); - return (T) jsonObject; - } catch (JSONException e) { - throw new AtlasServiceException(api, e); - } - } else { - T entity = clientResponse.getEntity(responseType); - return entity; - } - } catch (ClientHandlerException e) { - throw new AtlasServiceException(api, e); - } - } else if (clientResponse.getStatus() != ClientResponse.Status.SERVICE_UNAVAILABLE.getStatusCode()) { - break; - } else { - LOG.error("Got a service unavailable when calling: {}, will retry..", resource); - sleepBetweenRetries(); - } - - i++; - } while (i < getNumberOfRetries()); - - throw new AtlasServiceException(api, clientResponse); - } - - private WebResource getResource(WebResource service, String path, String... pathParams) { - WebResource resource = service.path(path); - resource = appendPathParams(resource, pathParams); - return resource; - } - - void sleepBetweenRetries() { - try { - Thread.sleep(getSleepBetweenRetriesMs()); - } catch (InterruptedException e) { - LOG.error("Interrupted from sleeping between retries.", e); - } - } - - int getNumberOfRetries() { - return configuration.getInt(AtlasBaseClient.ATLAS_CLIENT_HA_RETRIES_KEY, AtlasBaseClient.DEFAULT_NUM_RETRIES); - } - - private int getSleepBetweenRetriesMs() { - return configuration.getInt(AtlasBaseClient.ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY, AtlasBaseClient.DEFAULT_SLEEP_BETWEEN_RETRIES_MS); - } - - /** - * Return status of the service instance the client is pointing to. - * - * @return One of the values in ServiceState.ServiceStateValue or {@link #UNKNOWN_STATUS} if - * there is a JSON parse exception - * @throws AtlasServiceException if there is a HTTP error. - */ - public String getAdminStatus() throws AtlasServiceException { - String result = AtlasBaseClient.UNKNOWN_STATUS; - WebResource resource = getResource(service, STATUS.getPath()); - JSONObject response = callAPIWithResource(STATUS, resource, null, JSONObject.class); - try { - result = response.getString("Status"); - } catch (JSONException e) { - LOG.error("Exception while parsing admin status response. Returned response {}", response.toString(), e); - } - return result; - } - - /** - * @return Return metrics of the service instance the client is pointing to - * @throws AtlasServiceException - */ - public AtlasMetrics getAtlasMetrics() throws AtlasServiceException { - return callAPI(METRICS, AtlasMetrics.class, null); - } - - boolean isRetryableException(ClientHandlerException che) { - return che.getCause().getClass().equals(IOException.class) - || che.getCause().getClass().equals(ConnectException.class); - } - - void handleClientHandlerException(ClientHandlerException che) { - if (isRetryableException(che)) { - atlasClientContext.getClient().destroy(); - LOG.warn("Destroyed current context while handling ClientHandlerEception."); - LOG.warn("Will retry and create new context."); - sleepBetweenRetries(); - initializeState(atlasClientContext.getBaseUrls(), atlasClientContext.getUgi(), - atlasClientContext.getDoAsUser()); - return; - } - throw che; - } - - @VisibleForTesting - JSONObject callAPIWithRetries(APIInfo api, Object requestObject, ResourceCreator resourceCreator) - throws AtlasServiceException { - for (int i = 0; i < getNumberOfRetries(); i++) { - WebResource resource = resourceCreator.createResource(); - try { - LOG.debug("Using resource {} for {} times", resource.getURI(), i + 1); - return callAPIWithResource(api, resource, requestObject, JSONObject.class); - } catch (ClientHandlerException che) { - if (i == (getNumberOfRetries() - 1)) { - throw che; - } - LOG.warn("Handled exception in calling api {}", api.getPath(), che); - LOG.warn("Exception's cause: {}", che.getCause().getClass()); - handleClientHandlerException(che); - } - } - throw new AtlasServiceException(api, new RuntimeException("Could not get response after retries.")); - } - - public <T> T callAPI(APIInfo api, Class<T> responseType, Object requestObject, String... params) - throws AtlasServiceException { - return callAPIWithResource(api, getResource(api, params), requestObject, responseType); - } - - public <T> T callAPI(APIInfo api, GenericType<T> responseType, Object requestObject, String... params) - throws AtlasServiceException { - return callAPIWithResource(api, getResource(api, params), requestObject, responseType); - } - - - public <T> T callAPI(APIInfo api, Class<T> responseType, Object requestBody, - MultivaluedMap<String, String> queryParams, String... params) throws AtlasServiceException { - WebResource resource = getResource(api, queryParams, params); - return callAPIWithResource(api, resource, requestBody, responseType); - } - - public <T> T callAPI(APIInfo api, Class<T> responseType, MultivaluedMap<String, String> queryParams, String... params) - throws AtlasServiceException { - WebResource resource = getResource(api, queryParams, params); - return callAPIWithResource(api, resource, null, responseType); - } - - public <T> T callAPI(APIInfo api, GenericType<T> responseType, MultivaluedMap<String, String> queryParams, String... params) - throws AtlasServiceException { - WebResource resource = getResource(api, queryParams, params); - return callAPIWithResource(api, resource, null, responseType); - } - - protected WebResource getResource(APIInfo api, String... pathParams) { - return getResource(service, api, pathParams); - } - - // Modify URL to include the path params - private WebResource getResource(WebResource service, APIInfo api, String... pathParams) { - WebResource resource = service.path(api.getPath()); - resource = appendPathParams(resource, pathParams); - return resource; - } - - public <T> T callAPI(APIInfo api, Class<T> responseType, MultivaluedMap<String, String> queryParams) - throws AtlasServiceException { - return callAPIWithResource(api, getResource(api, queryParams), null, responseType); - } - - public <T> T callAPI(APIInfo api, Class<T> responseType, String queryParamKey, List<String> queryParamValues) - throws AtlasServiceException { - return callAPIWithResource(api, getResource(api, queryParamKey, queryParamValues), null, responseType); - } - - private WebResource getResource(APIInfo api, String queryParamKey, List<String> queryParamValues) { - WebResource resource = service.path(api.getPath()); - for (String queryParamValue : queryParamValues) { - if (StringUtils.isNotBlank(queryParamKey) && StringUtils.isNotBlank(queryParamValue)) { - resource = resource.queryParam(queryParamKey, queryParamValue); - } - } - return resource; - } - - protected WebResource getResource(APIInfo api, MultivaluedMap<String, String> queryParams, String... pathParams) { - WebResource resource = service.path(api.getPath()); - resource = appendPathParams(resource, pathParams); - resource = appendQueryParams(queryParams, resource); - return resource; - } - - private WebResource appendPathParams(WebResource resource, String[] pathParams) { - if (pathParams != null) { - for (String pathParam : pathParams) { - resource = resource.path(pathParam); - } - } - return resource; - } - - protected WebResource getResource(APIInfo api, MultivaluedMap<String, String> queryParams) { - return getResource(service, api, queryParams); - } - - // Modify URL to include the query params - private WebResource getResource(WebResource service, APIInfo api, MultivaluedMap<String, String> queryParams) { - WebResource resource = service.path(api.getPath()); - resource = appendQueryParams(queryParams, resource); - return resource; - } - - private WebResource appendQueryParams(MultivaluedMap<String, String> queryParams, WebResource resource) { - if (null != queryParams && !queryParams.isEmpty()) { - for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) { - for (String value : entry.getValue()) { - if (StringUtils.isNotBlank(value)) { - resource = resource.queryParam(entry.getKey(), value); - } - } - } - } - return resource; - } - - protected APIInfo updatePathParameters(APIInfo apiInfo, String... params) { - return new APIInfo(String.format(apiInfo.getPath(), params), apiInfo.getMethod(), apiInfo.getExpectedStatus()); - } - - @VisibleForTesting - void setConfiguration(Configuration configuration) { - this.configuration = configuration; - } - - @VisibleForTesting - void setService(WebResource resource) { - this.service = resource; - } - - - public static class APIInfo { - private final String method; - private final String path; - private final Response.Status status; - - public APIInfo(String path, String method, Response.Status status) { - this.path = path; - this.method = method; - this.status = status; - } - - public String getMethod() { - return method; - } - - public String getPath() { - return path; - } - - public Response.Status getExpectedStatus() { - return status; - } - } - - /** - * A class to capture input state while creating the client. - * - * The information here will be reused when the client is re-initialized on switch-over - * in case of High Availability. - */ - private class AtlasClientContext { - private String[] baseUrls; - private Client client; - private String doAsUser; - private UserGroupInformation ugi; - - public AtlasClientContext(String[] baseUrls, Client client, UserGroupInformation ugi, String doAsUser) { - this.baseUrls = baseUrls; - this.client = client; - this.ugi = ugi; - this.doAsUser = doAsUser; - } - - public Client getClient() { - return client; - } - - public String[] getBaseUrls() { - return baseUrls; - } - - public String getDoAsUser() { - return doAsUser; - } - - public UserGroupInformation getUgi() { - return ugi; - } - } -}
