ATLAS-1267: Client for V2 APIs and TypesREST integration tests Signed-off-by: Madhan Neethiraj <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/758b3d4d Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/758b3d4d Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/758b3d4d Branch: refs/heads/master Commit: 758b3d4df5033d60b5fa42a15d5bc4ee196c89bc Parents: f638823 Author: apoorvnaik <[email protected]> Authored: Sun Nov 13 21:36:35 2016 -0800 Committer: Madhan Neethiraj <[email protected]> Committed: Mon Nov 14 01:44:02 2016 -0800 ---------------------------------------------------------------------- .../java/org/apache/atlas/AtlasBaseClient.java | 541 +++++++++++++++++++ .../main/java/org/apache/atlas/AtlasClient.java | 492 +++-------------- .../org/apache/atlas/AtlasEntitiesClientV2.java | 136 +++++ .../org/apache/atlas/AtlasServiceException.java | 19 + .../org/apache/atlas/AtlasTypedefClientV2.java | 167 ++++++ .../java/org/apache/atlas/AtlasClientTest.java | 61 ++- .../atlas/web/resources/TypesResource.java | 1 + .../web/resources/TypedefsJerseyResourceIT.java | 347 ++++++++++++ .../web/resources/TypesJerseyResourceIT.java | 1 + .../security/NegativeSSLAndKerberosTest.java | 10 +- .../atlas/web/security/SSLAndKerberosTest.java | 20 +- .../org/apache/atlas/web/security/SSLTest.java | 11 +- 12 files changed, 1355 insertions(+), 451 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/758b3d4d/client/src/main/java/org/apache/atlas/AtlasBaseClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/AtlasBaseClient.java b/client/src/main/java/org/apache/atlas/AtlasBaseClient.java new file mode 100644 index 0000000..04a418a --- /dev/null +++ b/client/src/main/java/org/apache/atlas/AtlasBaseClient.java @@ -0,0 +1,541 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas; + +import com.google.common.annotations.VisibleForTesting; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter; +import com.sun.jersey.api.json.JSONConfiguration; +import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; +import org.apache.atlas.security.SecureClientUtils; +import org.apache.atlas.utils.AuthenticationUtil; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriBuilder; +import java.io.IOException; +import java.net.ConnectException; +import java.util.List; +import java.util.Map; + +import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED; + +public abstract class AtlasBaseClient { + public static final String BASE_URI = "api/atlas/"; + public static final String TYPES = "types"; + public static final String ADMIN_VERSION = "admin/version"; + public static final String ADMIN_STATUS = "admin/status"; + public static final String HTTP_AUTHENTICATION_ENABLED = "atlas.http.authentication.enabled"; + //Admin operations + public static final APIInfo VERSION = new APIInfo(BASE_URI + ADMIN_VERSION, HttpMethod.GET, Response.Status.OK); + public static final APIInfo STATUS = new APIInfo(BASE_URI + ADMIN_STATUS, HttpMethod.GET, Response.Status.OK); + static final String JSON_MEDIA_TYPE = MediaType.APPLICATION_JSON + "; charset=UTF-8"; + static final String UNKNOWN_STATUS = "Unknown status"; + static final String ATLAS_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries"; + // Setting the default value based on testing failovers while client code like quickstart is running. + static final int DEFAULT_NUM_RETRIES = 4; + static final String ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms"; + // Setting the default value based on testing failovers while client code like quickstart is running. + // With number of retries, this gives a total time of about 20s for the server to start. + static final int DEFAULT_SLEEP_BETWEEN_RETRIES_MS = 5000; + private static final Logger LOG = LoggerFactory.getLogger(AtlasBaseClient.class); + protected WebResource service; + protected Configuration configuration; + private String basicAuthUser; + private String basicAuthPassword; + private AtlasClientContext atlasClientContext; + private boolean retryEnabled = false; + + protected AtlasBaseClient() {} + + protected AtlasBaseClient(String[] baseUrl, String[] basicAuthUserNamePassword) { + if (basicAuthUserNamePassword != null) { + if (basicAuthUserNamePassword.length > 0) { + this.basicAuthUser = basicAuthUserNamePassword[0]; + } + if (basicAuthUserNamePassword.length > 1) { + this.basicAuthPassword = basicAuthUserNamePassword[1]; + } + } + + initializeState(baseUrl, null, null); + } + + protected AtlasBaseClient(String... baseUrls) throws AtlasException { + this(getCurrentUGI(), baseUrls); + } + + protected AtlasBaseClient(UserGroupInformation ugi, String[] baseUrls) { + this(ugi, ugi.getShortUserName(), baseUrls); + } + + protected AtlasBaseClient(UserGroupInformation ugi, String doAsUser, String[] baseUrls) { + initializeState(baseUrls, ugi, doAsUser); + } + + @VisibleForTesting + protected AtlasBaseClient(WebResource service, Configuration configuration) { + this.service = service; + this.configuration = configuration; + } + + @VisibleForTesting + protected AtlasBaseClient(Configuration configuration, String[] baseUrl, String[] basicAuthUserNamePassword) { + if (basicAuthUserNamePassword != null) { + if (basicAuthUserNamePassword.length > 0) { + this.basicAuthUser = basicAuthUserNamePassword[0]; + } + if (basicAuthUserNamePassword.length > 1) { + this.basicAuthPassword = basicAuthUserNamePassword[1]; + } + } + + initializeState(configuration, baseUrl, null, null); + } + + protected static UserGroupInformation getCurrentUGI() throws AtlasException { + try { + return UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new AtlasException(e); + } + } + + void initializeState(String[] baseUrls, UserGroupInformation ugi, String doAsUser) { + initializeState(getClientProperties(), baseUrls, ugi, doAsUser); + } + + void initializeState(Configuration configuration, String[] baseUrls, UserGroupInformation ugi, String doAsUser) { + this.configuration = configuration; + Client client = getClient(configuration, ugi, doAsUser); + + if ((!AuthenticationUtil.isKerberosAuthenticationEnabled()) && basicAuthUser != null && basicAuthPassword != null) { + final HTTPBasicAuthFilter authFilter = new HTTPBasicAuthFilter(basicAuthUser, basicAuthPassword); + client.addFilter(authFilter); + } + + String activeServiceUrl = determineActiveServiceURL(baseUrls, client); + atlasClientContext = new AtlasClientContext(baseUrls, client, ugi, doAsUser); + service = client.resource(UriBuilder.fromUri(activeServiceUrl).build()); + } + + @VisibleForTesting + protected Client getClient(Configuration configuration, UserGroupInformation ugi, String doAsUser) { + DefaultClientConfig config = new DefaultClientConfig(); + // Enable POJO mapping feature + config.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, Boolean.TRUE); + int readTimeout = configuration.getInt("atlas.client.readTimeoutMSecs", 60000);; + int connectTimeout = configuration.getInt("atlas.client.connectTimeoutMSecs", 60000); + if (configuration.getBoolean(TLS_ENABLED, false)) { + // create an SSL properties configuration if one doesn't exist. SSLFactory expects a file, so forced + // to create a + // configuration object, persist it, then subsequently pass in an empty configuration to SSLFactory + try { + SecureClientUtils.persistSSLClientConfiguration(configuration); + } catch (Exception e) { + LOG.info("Error processing client configuration.", e); + } + } + + final URLConnectionClientHandler handler; + + if ((!AuthenticationUtil.isKerberosAuthenticationEnabled()) && basicAuthUser != null && basicAuthPassword != null) { + if (configuration.getBoolean(TLS_ENABLED, false)) { + handler = SecureClientUtils.getUrlConnectionClientHandler(); + } else { + handler = new URLConnectionClientHandler(); + } + } else { + handler = SecureClientUtils.getClientConnectionHandler(config, configuration, doAsUser, ugi); + } + Client client = new Client(handler, config); + client.setReadTimeout(readTimeout); + client.setConnectTimeout(connectTimeout); + return client; + } + + @VisibleForTesting + protected String determineActiveServiceURL(String[] baseUrls, Client client) { + if (baseUrls.length == 0) { + throw new IllegalArgumentException("Base URLs cannot be null or empty"); + } + final String baseUrl; + AtlasServerEnsemble atlasServerEnsemble = new AtlasServerEnsemble(baseUrls); + if (atlasServerEnsemble.hasSingleInstance()) { + baseUrl = atlasServerEnsemble.firstURL(); + LOG.info("Client has only one service URL, will use that for all actions: {}", baseUrl); + } else { + try { + baseUrl = selectActiveServerAddress(client, atlasServerEnsemble); + } catch (AtlasServiceException e) { + LOG.error("None of the passed URLs are active: {}", atlasServerEnsemble, e); + throw new IllegalArgumentException("None of the passed URLs are active " + atlasServerEnsemble, e); + } + } + return baseUrl; + } + + private String selectActiveServerAddress(Client client, AtlasServerEnsemble serverEnsemble) + throws AtlasServiceException { + List<String> serverInstances = serverEnsemble.getMembers(); + String activeServerAddress = null; + for (String serverInstance : serverInstances) { + LOG.info("Trying with address {}", serverInstance); + activeServerAddress = getAddressIfActive(client, serverInstance); + if (activeServerAddress != null) { + LOG.info("Found service {} as active service.", serverInstance); + break; + } + } + if (activeServerAddress != null) + return activeServerAddress; + else + throw new AtlasServiceException(STATUS, new RuntimeException("Could not find any active instance")); + } + + private String getAddressIfActive(Client client, String serverInstance) { + String activeServerAddress = null; + for (int i = 0; i < getNumberOfRetries(); i++) { + try { + WebResource service = client.resource(UriBuilder.fromUri(serverInstance).build()); + String adminStatus = getAdminStatus(service); + if (StringUtils.equals(adminStatus, "ACTIVE")) { + activeServerAddress = serverInstance; + break; + } else { + LOG.info("attempt #{}: Service {} - is not active. status={}", (i+1), serverInstance, adminStatus); + } + } catch (Exception e) { + LOG.error("attempt #{}: Service {} - could not get status", (i+1), serverInstance, e); + } + sleepBetweenRetries(); + } + return activeServerAddress; + } + + protected Configuration getClientProperties() { + try { + if (configuration == null) { + configuration = ApplicationProperties.get(); + } + } catch (AtlasException e) { + LOG.error("Exception while loading configuration.", e); + } + return configuration; + } + + public boolean isServerReady() throws AtlasServiceException { + WebResource resource = getResource(VERSION.getPath()); + try { + callAPIWithResource(VERSION, resource, null, JSONObject.class); + return true; + } catch (ClientHandlerException che) { + return false; + } catch (AtlasServiceException ase) { + if (ase.getStatus() != null && ase.getStatus().equals(ClientResponse.Status.SERVICE_UNAVAILABLE)) { + LOG.warn("Received SERVICE_UNAVAILABLE, server is not yet ready"); + return false; + } + throw ase; + } + } + + protected WebResource getResource(String path, String... pathParams) { + return getResource(service, path, pathParams); + } + + protected <T> T callAPIWithResource(APIInfo api, WebResource resource, Object requestObject, Class<T> responseType) throws AtlasServiceException { + ClientResponse clientResponse = null; + int i = 0; + do { + clientResponse = resource + .accept(JSON_MEDIA_TYPE) + .type(JSON_MEDIA_TYPE) + .method(api.getMethod(), ClientResponse.class, requestObject); + + LOG.debug("API {} returned status {}", resource.getURI(), clientResponse.getStatus()); + if (clientResponse.getStatus() == api.getExpectedStatus().getStatusCode()) { + if (null == responseType) { + LOG.warn("No response type specified, returning null"); + return null; + } + try { + if (responseType == JSONObject.class) { + String stringEntity = clientResponse.getEntity(String.class); + try { + return (T) new JSONObject(stringEntity); + } catch (JSONException e) { + throw new AtlasServiceException(api, e); + } + } else { + return clientResponse.getEntity(responseType); + } + } catch (ClientHandlerException e) { + throw new AtlasServiceException(api, e); + } + } else if (clientResponse.getStatus() != ClientResponse.Status.SERVICE_UNAVAILABLE.getStatusCode()) { + break; + } else { + LOG.error("Got a service unavailable when calling: {}, will retry..", resource); + sleepBetweenRetries(); + } + + i++; + } while (i < getNumberOfRetries()); + + throw new AtlasServiceException(api, clientResponse); + } + + private WebResource getResource(WebResource service, String path, String... pathParams) { + WebResource resource = service.path(path); + if (pathParams != null) { + for (String pathParam : pathParams) { + resource = resource.path(pathParam); + } + } + return resource; + } + + void sleepBetweenRetries() { + try { + Thread.sleep(getSleepBetweenRetriesMs()); + } catch (InterruptedException e) { + LOG.error("Interrupted from sleeping between retries.", e); + } + } + + int getNumberOfRetries() { + return configuration.getInt(AtlasBaseClient.ATLAS_CLIENT_HA_RETRIES_KEY, AtlasBaseClient.DEFAULT_NUM_RETRIES); + } + + private int getSleepBetweenRetriesMs() { + return configuration.getInt(AtlasBaseClient.ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY, AtlasBaseClient.DEFAULT_SLEEP_BETWEEN_RETRIES_MS); + } + + /** + * Return status of the service instance the client is pointing to. + * + * @return One of the values in ServiceState.ServiceStateValue or {@link #UNKNOWN_STATUS} if + * there is a JSON parse exception + * @throws AtlasServiceException if there is a HTTP error. + */ + public String getAdminStatus() throws AtlasServiceException { + return getAdminStatus(service); + } + + private String getAdminStatus(WebResource service) throws AtlasServiceException { + String result = AtlasBaseClient.UNKNOWN_STATUS; + WebResource resource = getResource(service, STATUS.getPath()); + JSONObject response = callAPIWithResource(STATUS, resource, null, JSONObject.class); + try { + result = response.getString("Status"); + } catch (JSONException e) { + LOG.error("Exception while parsing admin status response. Returned response {}", response.toString(), e); + } + return result; + } + + boolean isRetryableException(ClientHandlerException che) { + return che.getCause().getClass().equals(IOException.class) + || che.getCause().getClass().equals(ConnectException.class); + } + + void handleClientHandlerException(ClientHandlerException che) { + if (isRetryableException(che)) { + atlasClientContext.getClient().destroy(); + LOG.warn("Destroyed current context while handling ClientHandlerEception."); + LOG.warn("Will retry and create new context."); + sleepBetweenRetries(); + initializeState(atlasClientContext.getBaseUrls(), atlasClientContext.getUgi(), + atlasClientContext.getDoAsUser()); + return; + } + throw che; + } + + public boolean isRetryEnabled() { + return retryEnabled; + } + + public void setRetryEnabled(boolean retryEnabled) { + this.retryEnabled = retryEnabled; + } + + @VisibleForTesting + JSONObject callAPIWithRetries(APIInfo api, Object requestObject, ResourceCreator resourceCreator) + throws AtlasServiceException { + for (int i = 0; i < getNumberOfRetries(); i++) { + WebResource resource = resourceCreator.createResource(); + try { + LOG.debug("Using resource {} for {} times", resource.getURI(), i); + JSONObject result = callAPIWithResource(api, resource, requestObject); + return result; + } catch (ClientHandlerException che) { + if (i == (getNumberOfRetries() - 1)) { + throw che; + } + LOG.warn("Handled exception in calling api {}", api.getPath(), che); + LOG.warn("Exception's cause: {}", che.getCause().getClass()); + handleClientHandlerException(che); + } + } + throw new AtlasServiceException(api, new RuntimeException("Could not get response after retries.")); + } + + protected JSONObject callAPIWithResource(APIInfo api, WebResource resource, Object requestObject) + throws AtlasServiceException { + return callAPIWithResource(api, resource, requestObject, JSONObject.class); + } + + protected JSONObject callAPI(final APIInfo api, Object requestObject, final String... pathParams) + throws AtlasServiceException { + return callAPIWithRetries(api, requestObject, new ResourceCreator() { + @Override + public WebResource createResource() { + return getResource(api, pathParams); + } + }); + } + + protected <T> T callAPI(APIInfo api, Object requestObject, Class<T> responseType, String... params) + throws AtlasServiceException { + return callAPIWithResource(api, getResource(api, params), requestObject, responseType); + } + + protected WebResource getResource(APIInfo api, String... pathParams) { + return getResource(service, api, pathParams); + } + + // Modify URL to include the path params + private WebResource getResource(WebResource service, APIInfo api, String... pathParams) { + WebResource resource = service.path(api.getPath()); + if (pathParams != null) { + for (String pathParam : pathParams) { + resource = resource.path(pathParam); + } + } + return resource; + } + + protected <T> T callAPI(APIInfo api, Object requestObject, Class<T> responseType, Map<String, String> queryParams) + throws AtlasServiceException { + return callAPIWithResource(api, getResource(api, queryParams), requestObject, responseType); + } + + protected WebResource getResource(APIInfo api, Map<String, String> queryParams) { + return getResource(service, api, queryParams); + } + + // Modify URL to include the query params + private WebResource getResource(WebResource service, APIInfo api, Map<String, String> queryParams) { + WebResource resource = service.path(api.getPath()); + if (null != queryParams && !queryParams.isEmpty()) { + for (Map.Entry<String, String> entry : queryParams.entrySet()) { + resource = resource.queryParam(entry.getKey(), entry.getValue()); + } + } + return resource; + } + + protected APIInfo formatPath(APIInfo apiInfo, String ... params) { + return new APIInfo(String.format(apiInfo.getPath(), params), apiInfo.getMethod(), apiInfo.getExpectedStatus()); + } + + @VisibleForTesting + void setConfiguration(Configuration configuration) { + this.configuration = configuration; + } + + @VisibleForTesting + void setService(WebResource resource) { + this.service = resource; + } + + + public static class APIInfo { + private final String method; + private final String path; + private final Response.Status status; + + APIInfo(String path, String method, Response.Status status) { + this.path = path; + this.method = method; + this.status = status; + } + + public String getMethod() { + return method; + } + + public String getPath() { + return path; + } + + public Response.Status getExpectedStatus() { + return status; + } + } + + /** + * A class to capture input state while creating the client. + * + * The information here will be reused when the client is re-initialized on switch-over + * in case of High Availability. + */ + private class AtlasClientContext { + private String[] baseUrls; + private Client client; + private String doAsUser; + private UserGroupInformation ugi; + + public AtlasClientContext(String[] baseUrls, Client client, UserGroupInformation ugi, String doAsUser) { + this.baseUrls = baseUrls; + this.client = client; + this.ugi = ugi; + this.doAsUser = doAsUser; + } + + public Client getClient() { + return client; + } + + public String[] getBaseUrls() { + return baseUrls; + } + + public String getDoAsUser() { + return doAsUser; + } + + public UserGroupInformation getUgi() { + return ugi; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/758b3d4d/client/src/main/java/org/apache/atlas/AtlasClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java index 6c13ec8..70e1a0d 100755 --- a/client/src/main/java/org/apache/atlas/AtlasClient.java +++ b/client/src/main/java/org/apache/atlas/AtlasClient.java @@ -20,16 +20,10 @@ package org.apache.atlas; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientHandlerException; -import com.sun.jersey.api.client.ClientResponse; + import com.sun.jersey.api.client.WebResource; -import com.sun.jersey.api.client.config.DefaultClientConfig; -import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter; -import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; -import org.apache.atlas.security.SecureClientUtils; + +import org.apache.atlas.type.AtlasType; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Struct; import org.apache.atlas.typesystem.TypesDef; @@ -40,7 +34,6 @@ import org.apache.atlas.typesystem.types.DataTypes; import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition; import org.apache.atlas.typesystem.types.TraitType; import org.apache.atlas.typesystem.types.utils.TypesUtil; -import org.apache.atlas.utils.AuthenticationUtil; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -50,12 +43,6 @@ import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriBuilder; -import java.io.IOException; -import java.net.ConnectException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -63,12 +50,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.Response; /** * Client for metadata. */ -public class AtlasClient { +public class AtlasClient extends AtlasBaseClient { private static final Logger LOG = LoggerFactory.getLogger(AtlasClient.class); public static final String TYPE = "type"; @@ -90,10 +78,6 @@ public class AtlasClient { public static final String START_KEY = "startKey"; public static final String NUM_RESULTS = "count"; - public static final String BASE_URI = "api/atlas/"; - public static final String ADMIN_VERSION = "admin/version"; - public static final String ADMIN_STATUS = "admin/status"; - public static final String TYPES = "types"; public static final String URI_ENTITY = "entities"; public static final String URI_ENTITY_AUDIT = "audit"; public static final String URI_SEARCH = "discovery/search"; @@ -128,39 +112,11 @@ public class AtlasClient { public static final String REFERENCEABLE_SUPER_TYPE = "Referenceable"; public static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName"; - public static final String JSON_MEDIA_TYPE = MediaType.APPLICATION_JSON + "; charset=UTF-8"; public static final String UNKNOWN_STATUS = "Unknown status"; - public static final String ATLAS_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries"; - // Setting the default value based on testing failovers while client code like quickstart is running. - public static final int DEFAULT_NUM_RETRIES = 4; - public static final String ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms"; - - public static final String HTTP_AUTHENTICATION_ENABLED = "atlas.http.authentication.enabled"; - - // Setting the default value based on testing failovers while client code like quickstart is running. - // With number of retries, this gives a total time of about 20s for the server to start. - public static final int DEFAULT_SLEEP_BETWEEN_RETRIES_MS = 5000; - - private WebResource service; - private AtlasClientContext atlasClientContext; - private Configuration configuration; - private String basicAuthUser; - private String basicAuthPassword; - - // New constuctor for Basic auth public AtlasClient(String[] baseUrl, String[] basicAuthUserNamepassword) { - if (basicAuthUserNamepassword != null) { - if (basicAuthUserNamepassword.length > 0) { - this.basicAuthUser = basicAuthUserNamepassword[0]; - } - if (basicAuthUserNamepassword.length > 1) { - this.basicAuthPassword = basicAuthUserNamepassword[1]; - } - } - - initializeState(baseUrl, null, null); + super(baseUrl, basicAuthUserNamepassword); } /** @@ -187,14 +143,6 @@ public class AtlasClient { initializeState(baseUrls, ugi, doAsUser); } - private static UserGroupInformation getCurrentUGI() throws AtlasException { - try { - return UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - throw new AtlasException(e); - } - } - private AtlasClient(UserGroupInformation ugi, String[] baseUrls) { this(ugi, ugi.getShortUserName(), baseUrls); } @@ -204,268 +152,25 @@ public class AtlasClient { //Do nothing } - private void initializeState(String[] baseUrls, UserGroupInformation ugi, String doAsUser) { - configuration = getClientProperties(); - Client client = getClient(configuration, ugi, doAsUser); - - if ((!AuthenticationUtil.isKerberosAuthenticationEnabled()) && basicAuthUser!=null && basicAuthPassword!=null) { - final HTTPBasicAuthFilter authFilter = new HTTPBasicAuthFilter(basicAuthUser, basicAuthPassword); - client.addFilter(authFilter); - } - - String activeServiceUrl = determineActiveServiceURL(baseUrls, client); - atlasClientContext = new AtlasClientContext(baseUrls, client, ugi, doAsUser); - service = client.resource(UriBuilder.fromUri(activeServiceUrl).build()); - } - @VisibleForTesting - protected Client getClient(Configuration configuration, UserGroupInformation ugi, String doAsUser) { - DefaultClientConfig config = new DefaultClientConfig(); - Configuration clientConfig = null; - int readTimeout = 60000; - int connectTimeout = 60000; - try { - clientConfig = configuration; - if (clientConfig.getBoolean(TLS_ENABLED, false)) { - // create an SSL properties configuration if one doesn't exist. SSLFactory expects a file, so forced - // to create a - // configuration object, persist it, then subsequently pass in an empty configuration to SSLFactory - SecureClientUtils.persistSSLClientConfiguration(clientConfig); - } - readTimeout = clientConfig.getInt("atlas.client.readTimeoutMSecs", readTimeout); - connectTimeout = clientConfig.getInt("atlas.client.connectTimeoutMSecs", connectTimeout); - } catch (Exception e) { - LOG.info("Error processing client configuration.", e); - } - - URLConnectionClientHandler handler = null; - - if ((!AuthenticationUtil.isKerberosAuthenticationEnabled()) && basicAuthUser != null && basicAuthPassword != null) { - if (clientConfig.getBoolean(TLS_ENABLED, false)) { - handler = SecureClientUtils.getUrlConnectionClientHandler(); - } else { - handler = new URLConnectionClientHandler(); - } - } else { - handler = - SecureClientUtils.getClientConnectionHandler(config, clientConfig, doAsUser, ugi); - } - Client client = new Client(handler, config); - client.setReadTimeout(readTimeout); - client.setConnectTimeout(connectTimeout); - return client; + public AtlasClient(Configuration configuration, String[] baseUrl, String[] basicAuthUserNamepassword) { + super(configuration, baseUrl, basicAuthUserNamepassword); } @VisibleForTesting - protected String determineActiveServiceURL(String[] baseUrls, Client client) { - if (baseUrls.length == 0) { - throw new IllegalArgumentException("Base URLs cannot be null or empty"); - } - String baseUrl; - AtlasServerEnsemble atlasServerEnsemble = new AtlasServerEnsemble(baseUrls); - if (atlasServerEnsemble.hasSingleInstance()) { - baseUrl = atlasServerEnsemble.firstURL(); - LOG.info("Client has only one service URL, will use that for all actions: {}", baseUrl); - return baseUrl; - } else { - try { - baseUrl = selectActiveServerAddress(client, atlasServerEnsemble); - } catch (AtlasServiceException e) { - LOG.error("None of the passed URLs are active: {}", atlasServerEnsemble, e); - throw new IllegalArgumentException("None of the passed URLs are active " + atlasServerEnsemble, e); - } - } - return baseUrl; - } - - private String selectActiveServerAddress(Client client, AtlasServerEnsemble serverEnsemble) - throws AtlasServiceException { - List<String> serverInstances = serverEnsemble.getMembers(); - String activeServerAddress = null; - for (String serverInstance : serverInstances) { - LOG.info("Trying with address {}", serverInstance); - activeServerAddress = getAddressIfActive(client, serverInstance); - if (activeServerAddress != null) { - LOG.info("Found service {} as active service.", serverInstance); - break; - } - } - if (activeServerAddress != null) - return activeServerAddress; - else - throw new AtlasServiceException(API.STATUS, new RuntimeException("Could not find any active instance")); - } - - private String getAddressIfActive(Client client, String serverInstance) { - String activeServerAddress = null; - for (int i = 0; i < getNumberOfRetries(); i++) { - try { - WebResource service = client.resource(UriBuilder.fromUri(serverInstance).build()); - String adminStatus = getAdminStatus(service); - if (adminStatus.equals("ACTIVE")) { - activeServerAddress = serverInstance; - break; - } else { - LOG.info("Service {} is not active.. will retry.", serverInstance); - } - } catch (Exception e) { - LOG.error("Could not get status from service {} after {} tries.", serverInstance, i, e); - } - sleepBetweenRetries(); - LOG.warn("Service {} is not active.", serverInstance); - } - return activeServerAddress; - } - - private void sleepBetweenRetries(){ - try { - Thread.sleep(getSleepBetweenRetriesMs()); - } catch (InterruptedException e) { - LOG.error("Interrupted from sleeping between retries.", e); - } - } - - private int getSleepBetweenRetriesMs() { - return configuration.getInt(ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY, DEFAULT_SLEEP_BETWEEN_RETRIES_MS); - } - - private int getNumberOfRetries() { - return configuration.getInt(ATLAS_CLIENT_HA_RETRIES_KEY, DEFAULT_NUM_RETRIES); + public AtlasClient(Configuration configuration, String... baseUrls) throws AtlasException { + initializeState(configuration, baseUrls, getCurrentUGI(), getCurrentUGI().getShortUserName()); } @VisibleForTesting AtlasClient(WebResource service, Configuration configuration) { - this.service = service; - this.configuration = configuration; - } - - protected Configuration getClientProperties() { - try { - if (configuration == null) { - configuration = ApplicationProperties.get(); - } - } catch (AtlasException e) { - LOG.error("Exception while loading configuration.", e); - } - return configuration; - } - - public boolean isServerReady() throws AtlasServiceException { - WebResource resource = getResource(API.VERSION); - try { - callAPIWithResource(API.VERSION, resource, null); - return true; - } catch (ClientHandlerException che) { - return false; - } catch (AtlasServiceException ase) { - if (ase.getStatus().equals(ClientResponse.Status.SERVICE_UNAVAILABLE)) { - LOG.warn("Received SERVICE_UNAVAILABLE, server is not yet ready"); - return false; - } - throw ase; - } + super(service, configuration); } public WebResource getResource() { return service; } - public static class EntityResult { - private static final Gson gson = new GsonBuilder().setPrettyPrinting().create(); - - public static final String OP_CREATED = "created"; - public static final String OP_UPDATED = "updated"; - public static final String OP_DELETED = "deleted"; - - Map<String, List<String>> entities = new HashMap<>(); - - public EntityResult() { - //For gson - } - - public EntityResult(List<String> created, List<String> updated, List<String> deleted) { - add(OP_CREATED, created); - add(OP_UPDATED, updated); - add(OP_DELETED, deleted); - } - - private void add(String type, List<String> list) { - if (list != null && list.size() > 0) { - entities.put(type, list); - } - } - - private List<String> get(String type) { - List<String> list = entities.get(type); - if (list == null) { - list = new ArrayList<>(); - } - return list; - } - - public List<String> getCreatedEntities() { - return get(OP_CREATED); - } - - public List<String> getUpdateEntities() { - return get(OP_UPDATED); - } - - public List<String> getDeletedEntities() { - return get(OP_DELETED); - } - - @Override - public String toString() { - return gson.toJson(this); - } - - public static EntityResult fromString(String json) throws AtlasServiceException { - return gson.fromJson(json, EntityResult.class); - } - } - - /** - * Return status of the service instance the client is pointing to. - * - * @return One of the values in ServiceState.ServiceStateValue or {@link #UNKNOWN_STATUS} if there is a JSON parse - * exception - * @throws AtlasServiceException if there is a HTTP error. - */ - public String getAdminStatus() throws AtlasServiceException { - return getAdminStatus(service); - } - - private void handleClientHandlerException(ClientHandlerException che) { - if (isRetryableException(che)) { - atlasClientContext.getClient().destroy(); - LOG.warn("Destroyed current context while handling ClientHandlerEception."); - LOG.warn("Will retry and create new context."); - sleepBetweenRetries(); - initializeState(atlasClientContext.getBaseUrls(), atlasClientContext.getUgi(), - atlasClientContext.getDoAsUser()); - return; - } - throw che; - } - - private boolean isRetryableException(ClientHandlerException che) { - return che.getCause().getClass().equals(IOException.class) - || che.getCause().getClass().equals(ConnectException.class); - } - - private String getAdminStatus(WebResource service) throws AtlasServiceException { - String result = UNKNOWN_STATUS; - WebResource resource = getResource(service, API.STATUS); - JSONObject response = callAPIWithResource(API.STATUS, resource, null); - try { - result = response.getString(STATUS); - } catch (JSONException e) { - LOG.error("Exception while parsing admin status response. Returned response {}", response.toString(), e); - } - return result; - } - public enum API { //Admin operations @@ -530,8 +235,63 @@ public class AtlasClient { public String getPath() { return path; } - - public Response.Status getExpectedStatus() { return status; } + + public Response.Status getExpectedStatus() { + return status; + } + } + + public static class EntityResult { + public static final String OP_CREATED = "created"; + public static final String OP_UPDATED = "updated"; + public static final String OP_DELETED = "deleted"; + + Map<String, List<String>> entities = new HashMap<>(); + + public EntityResult() { + //For gson + } + + public EntityResult(List<String> created, List<String> updated, List<String> deleted) { + add(OP_CREATED, created); + add(OP_UPDATED, updated); + add(OP_DELETED, deleted); + } + + private void add(String type, List<String> list) { + if (list != null && list.size() > 0) { + entities.put(type, list); + } + } + + private List<String> get(String type) { + List<String> list = entities.get(type); + if (list == null) { + list = new ArrayList<>(); + } + return list; + } + + public List<String> getCreatedEntities() { + return get(OP_CREATED); + } + + public List<String> getUpdateEntities() { + return get(OP_UPDATED); + } + + public List<String> getDeletedEntities() { + return get(OP_DELETED); + } + + @Override + public String toString() { + return AtlasType.toJson(this); + } + + public static EntityResult fromString(String json) throws AtlasServiceException { + return AtlasType.fromJson(json, EntityResult.class); + } } /** @@ -639,7 +399,7 @@ public class AtlasClient { JSONObject response = callAPIWithRetries(API.LIST_TYPES, null, new ResourceCreator() { @Override public WebResource createResource() { - WebResource resource = getResource(API.LIST_TYPES); + WebResource resource = getResource(API.LIST_TYPES.getPath()); resource = resource.queryParam(TYPE, category.name()); return resource; } @@ -780,27 +540,6 @@ public class AtlasClient { return extractEntityResult(response); } - @VisibleForTesting - JSONObject callAPIWithRetries(API api, Object requestObject, ResourceCreator resourceCreator) - throws AtlasServiceException { - for (int i = 0; i < getNumberOfRetries(); i++) { - WebResource resource = resourceCreator.createResource(); - try { - LOG.debug("Using resource {} for {} times", resource.getURI(), i); - JSONObject result = callAPIWithResource(api, resource, requestObject); - return result; - } catch (ClientHandlerException che) { - if (i==(getNumberOfRetries()-1)) { - throw che; - } - LOG.warn("Handled exception in calling api {}", api.getPath(), che); - LOG.warn("Exception's cause: {}", che.getCause().getClass()); - handleClientHandlerException(che); - } - } - throw new AtlasServiceException(api, new RuntimeException("Could not get response after retries.")); - } - /** * Supports Partial updates * Updates properties set in the definition for the entity corresponding to guid @@ -1205,93 +944,26 @@ public class AtlasClient { } } - private WebResource getResource(API api, String... pathParams) { - return getResource(service, api, pathParams); - } + // Wrapper methods for compatibility - private WebResource getResource(WebResource service, API api, String... pathParams) { - WebResource resource = service.path(api.getPath()); - if (pathParams != null) { - for (String pathParam : pathParams) { - resource = resource.path(pathParam); - } - } - return resource; + JSONObject callAPIWithResource(API api, WebResource resource, Object requestObject) throws AtlasServiceException { + return callAPIWithResource(toAPIInfo(api), resource, requestObject); } - private JSONObject callAPIWithResource(API api, WebResource resource, Object requestObject) - throws AtlasServiceException { - ClientResponse clientResponse = null; - int i = 0; - do { - clientResponse = resource.accept(JSON_MEDIA_TYPE).type(JSON_MEDIA_TYPE) - .method(api.getMethod(), ClientResponse.class, requestObject); - - LOG.debug("API {} returned status {}", resource.getURI(), clientResponse.getStatus()); - if (clientResponse.getStatus() == api.getExpectedStatus().getStatusCode()) { - String responseAsString = clientResponse.getEntity(String.class); - try { - return new JSONObject(responseAsString); - } catch (JSONException e) { - throw new AtlasServiceException(api, e); - } - } else if (clientResponse.getStatus() != ClientResponse.Status.SERVICE_UNAVAILABLE.getStatusCode()) { - break; - } else { - LOG.error("Got a service unavailable when calling: {}, will retry..", resource); - sleepBetweenRetries(); - } - - i++; - } while (i < getNumberOfRetries()); - - throw new AtlasServiceException(api, clientResponse); + WebResource getResource(API api, String ... params) { + return getResource(toAPIInfo(api), params); } - private JSONObject callAPI(final API api, Object requestObject, final String... pathParams) - throws AtlasServiceException { - return callAPIWithRetries(api, requestObject, new ResourceCreator() { - @Override - public WebResource createResource() { - return getResource(api, pathParams); - } - }); + JSONObject callAPI(API api, Object requestObject, String ... params) throws AtlasServiceException { + return callAPI(toAPIInfo(api), requestObject, params); } - /** - * A class to capture input state while creating the client. - * - * The information here will be reused when the client is re-initialized on switch-over - * in case of High Availability. - */ - private class AtlasClientContext { - private String[] baseUrls; - private Client client; - private String doAsUser; - private UserGroupInformation ugi; - - public AtlasClientContext(String[] baseUrls, Client client, UserGroupInformation ugi, String doAsUser) { - this.baseUrls = baseUrls; - this.client = client; - this.ugi = ugi; - this.doAsUser = doAsUser; - } - - public Client getClient() { - return client; - } - - public String[] getBaseUrls() { - return baseUrls; - } - - public String getDoAsUser() { - return doAsUser; - } + JSONObject callAPIWithRetries(API api, Object requestObject, ResourceCreator resourceCreator) throws AtlasServiceException { + return super.callAPIWithRetries(toAPIInfo(api), requestObject, resourceCreator); + } - public UserGroupInformation getUgi() { - return ugi; - } + private APIInfo toAPIInfo(API api){ + return new APIInfo(api.getPath(), api.getMethod(), api.getExpectedStatus()); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/758b3d4d/client/src/main/java/org/apache/atlas/AtlasEntitiesClientV2.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/AtlasEntitiesClientV2.java b/client/src/main/java/org/apache/atlas/AtlasEntitiesClientV2.java new file mode 100644 index 0000000..fae4dd8 --- /dev/null +++ b/client/src/main/java/org/apache/atlas/AtlasEntitiesClientV2.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas; + +import com.google.common.annotations.VisibleForTesting; + +import com.sun.jersey.api.client.WebResource; + +import org.apache.atlas.model.SearchFilter; +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasClassification.AtlasClassifications; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.commons.configuration.Configuration; +import org.apache.hadoop.security.UserGroupInformation; + +import java.util.List; + +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.Response; + +public class AtlasEntitiesClientV2 extends AtlasBaseClient { + + public static final String ENTITY_API = BASE_URI + "v2/entity/"; + public static final String ENTITIES_API = BASE_URI + "v2/entities/"; + + private static final APIInfo GET_ENTITY_BY_GUID = new APIInfo(ENTITY_API + "guid/", HttpMethod.GET, Response.Status.OK); + private static final APIInfo CREATE_ENTITY = new APIInfo(ENTITY_API, HttpMethod.POST, Response.Status.OK); + private static final APIInfo UPDATE_ENTITY = CREATE_ENTITY; + private static final APIInfo UPDATE_ENTITY_BY_GUID = new APIInfo(ENTITY_API + "guid/", HttpMethod.PUT, Response.Status.OK); + private static final APIInfo DELETE_ENTITY_BY_GUID = new APIInfo(ENTITY_API + "guid/", HttpMethod.DELETE, Response.Status.OK); + private static final APIInfo GET_CLASSIFICATIONS = new APIInfo(ENTITY_API + "guid/%s/classifications", HttpMethod.GET, Response.Status.OK); + private static final APIInfo ADD_CLASSIFICATIONS = new APIInfo(ENTITY_API + "guid/%s/classifications", HttpMethod.POST, Response.Status.OK); + private static final APIInfo UPDATE_CLASSIFICATIONS = new APIInfo(ENTITY_API + "guid/%s/classifications", HttpMethod.PUT, Response.Status.OK); + private static final APIInfo DELETE_CLASSIFICATION = new APIInfo(ENTITY_API + "guid/%s/classification/%s", HttpMethod.DELETE, Response.Status.OK); + + private static final APIInfo GET_ENTITIES = new APIInfo(ENTITIES_API + "guids/", HttpMethod.GET, Response.Status.OK); + private static final APIInfo CREATE_ENTITIES = new APIInfo(ENTITIES_API, HttpMethod.POST, Response.Status.OK); + private static final APIInfo UPDATE_ENTITIES = CREATE_ENTITIES; + private static final APIInfo DELETE_ENTITIES = new APIInfo(ENTITIES_API + "guids/", HttpMethod.GET, Response.Status.OK); + private static final APIInfo SEARCH_ENTITIES = new APIInfo(ENTITIES_API, HttpMethod.GET, Response.Status.OK); + + public AtlasEntitiesClientV2(String[] baseUrl, String[] basicAuthUserNamePassword) { + super(baseUrl, basicAuthUserNamePassword); + } + + public AtlasEntitiesClientV2(String... baseUrls) throws AtlasException { + super(baseUrls); + } + + public AtlasEntitiesClientV2(UserGroupInformation ugi, String doAsUser, String... baseUrls) { + super(ugi, doAsUser, baseUrls); + } + + protected AtlasEntitiesClientV2() { + super(); + } + + @VisibleForTesting + AtlasEntitiesClientV2(WebResource service, Configuration configuration) { + super(service, configuration); + } + + public AtlasEntity getEntityByGuid(String guid) throws AtlasServiceException { + return callAPI(GET_ENTITY_BY_GUID, null, AtlasEntity.class, guid); + } + + public EntityMutationResponse createEntity(AtlasEntity atlasEntity) throws AtlasServiceException { + return callAPI(CREATE_ENTITY, atlasEntity, EntityMutationResponse.class); + } + + public EntityMutationResponse updateEntity(AtlasEntity atlasEntity) throws AtlasServiceException { + return callAPI(UPDATE_ENTITY, atlasEntity, EntityMutationResponse.class); + } + + public EntityMutationResponse updateEntity(String guid, AtlasEntity atlasEntity) throws AtlasServiceException { + return callAPI(UPDATE_ENTITY, atlasEntity, EntityMutationResponse.class, guid); + } + + public AtlasEntity deleteEntityByGuid(String guid) throws AtlasServiceException { + return callAPI(DELETE_ENTITY_BY_GUID, null, AtlasEntity.class, guid); + } + + public AtlasClassifications getClassifications(String guid) throws AtlasServiceException { + return callAPI(formatPath(GET_CLASSIFICATIONS, guid), null, AtlasClassifications.class); + } + + public void addClassifications(String guid, List<AtlasClassification> classifications) throws AtlasServiceException { + callAPI(formatPath(ADD_CLASSIFICATIONS, guid), classifications, AtlasClassifications.class); + } + + public void updateClassifications(String guid, List<AtlasClassification> classifications) throws AtlasServiceException { + callAPI(formatPath(UPDATE_CLASSIFICATIONS, guid), classifications, AtlasClassifications.class); + } + + public void deleteClassifications(String guid, List<AtlasClassification> classifications) throws AtlasServiceException { + callAPI(formatPath(GET_CLASSIFICATIONS, guid), classifications, AtlasClassifications.class); + } + + public void deleteClassification(String guid, String classificationName) throws AtlasServiceException { + callAPI(formatPath(DELETE_CLASSIFICATION, guid, classificationName), null, AtlasClassifications.class); + } + + // Entities operations + public List<AtlasEntity> getEntities(List<String> entityIds) { + // TODO Map the query params correctly + return null; + } + + public List<AtlasEntity> createEntities(List<AtlasEntity> atlasEntities) throws AtlasServiceException { + return (List<AtlasEntity>)callAPI(CREATE_ENTITIES, atlasEntities, List.class); + } + + public List<AtlasEntity> updateEntities(List<AtlasEntity> atlasEntities) throws AtlasServiceException { + return (List<AtlasEntity>)callAPI(UPDATE_ENTITIES, atlasEntities, List.class); + } + + public AtlasEntity.AtlasEntities searchEntities(SearchFilter searchFilter) throws AtlasServiceException { + return callAPI(GET_ENTITIES, null, AtlasEntity.AtlasEntities.class, searchFilter.getParams()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/758b3d4d/client/src/main/java/org/apache/atlas/AtlasServiceException.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/AtlasServiceException.java b/client/src/main/java/org/apache/atlas/AtlasServiceException.java index 367d52d..4719e7c 100755 --- a/client/src/main/java/org/apache/atlas/AtlasServiceException.java +++ b/client/src/main/java/org/apache/atlas/AtlasServiceException.java @@ -31,21 +31,40 @@ public class AtlasServiceException extends Exception { super("Metadata service API " + api + " failed", e); } + public AtlasServiceException(AtlasBaseClient.APIInfo api, Exception e) { + super("Metadata service API " + api + " failed", e); + } + public AtlasServiceException(AtlasClient.API api, WebApplicationException e) throws JSONException { this(api, ClientResponse.Status.fromStatusCode(e.getResponse().getStatus()), ((JSONObject) e.getResponse().getEntity()).getString("stackTrace")); } + public AtlasServiceException(AtlasBaseClient.APIInfo api, WebApplicationException e) throws JSONException { + this(api, ClientResponse.Status.fromStatusCode(e.getResponse().getStatus()), + ((JSONObject) e.getResponse().getEntity()).getString("stackTrace")); + } + private AtlasServiceException(AtlasClient.API api, ClientResponse.Status status, String response) { super("Metadata service API " + api + " failed with status " + (status != null ? status.getStatusCode() : -1) + " (" + status + ") Response Body (" + response + ")"); this.status = status; } + private AtlasServiceException(AtlasBaseClient.APIInfo api, ClientResponse.Status status, String response) { + super("Metadata service API " + api + " failed with status " + (status != null ? status.getStatusCode() : -1) + + " (" + status + ") Response Body (" + response + ")"); + this.status = status; + } + public AtlasServiceException(AtlasClient.API api, ClientResponse response) { this(api, ClientResponse.Status.fromStatusCode(response.getStatus()), response.getEntity(String.class)); } + public AtlasServiceException(AtlasBaseClient.APIInfo api, ClientResponse response) { + this(api, ClientResponse.Status.fromStatusCode(response.getStatus()), response.getEntity(String.class)); + } + public AtlasServiceException(Exception e) { super(e); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/758b3d4d/client/src/main/java/org/apache/atlas/AtlasTypedefClientV2.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/AtlasTypedefClientV2.java b/client/src/main/java/org/apache/atlas/AtlasTypedefClientV2.java new file mode 100644 index 0000000..cf86e7a --- /dev/null +++ b/client/src/main/java/org/apache/atlas/AtlasTypedefClientV2.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas; + +import com.google.common.annotations.VisibleForTesting; + +import com.sun.jersey.api.client.WebResource; + +import org.apache.atlas.model.SearchFilter; +import org.apache.atlas.model.typedef.AtlasClassificationDef; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasEnumDef; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.type.AtlasType; +import org.apache.commons.configuration.Configuration; +import org.apache.hadoop.security.UserGroupInformation; + +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.Response; + +public class AtlasTypedefClientV2 extends AtlasBaseClient { + + private static final String BASE_URI = "api/atlas/v2/types/"; + private static final String TYPEDEFS_PATH = BASE_URI + "typedefs/"; + private static final String GET_BY_NAME_TEMPLATE = BASE_URI + "%s/name/%s"; + private static final String GET_BY_GUID_TEMPLATE = BASE_URI + "%s/guid/%s"; + + private static final APIInfo GET_ALL_TYPE_DEFS = new APIInfo(TYPEDEFS_PATH, HttpMethod.GET, Response.Status.OK); + private static final APIInfo CREATE_ALL_TYPE_DEFS = new APIInfo(TYPEDEFS_PATH, HttpMethod.POST, Response.Status.OK); + private static final APIInfo UPDATE_ALL_TYPE_DEFS = new APIInfo(TYPEDEFS_PATH, HttpMethod.PUT, Response.Status.OK); + private static final APIInfo DELETE_ALL_TYPE_DEFS = new APIInfo(TYPEDEFS_PATH, HttpMethod.DELETE, Response.Status.OK); + + public AtlasTypedefClientV2(String[] baseUrl, String[] basicAuthUserNamePassword) { + super(baseUrl, basicAuthUserNamePassword); + } + + public AtlasTypedefClientV2(String... baseUrls) throws AtlasException { + super(baseUrls); + } + + public AtlasTypedefClientV2(UserGroupInformation ugi, String doAsUser, String... baseUrls) { + super(ugi, doAsUser, baseUrls); + } + + protected AtlasTypedefClientV2() { + super(); + } + + @VisibleForTesting + AtlasTypedefClientV2(WebResource service, Configuration configuration) { + super(service, configuration); + } + + /** + * Bulk retrieval API for retrieving all type definitions in Atlas + * + * @return A composite wrapper object with lists of all type definitions + */ + public AtlasTypesDef getAllTypeDefs(SearchFilter searchFilter) throws AtlasServiceException { + return callAPI(GET_ALL_TYPE_DEFS, null, AtlasTypesDef.class, searchFilter.getParams()); + } + + public AtlasEnumDef getEnumByName(final String name) throws AtlasServiceException { + return getTypeDefByName(name, AtlasEnumDef.class); + } + + public AtlasEnumDef getEnumByGuid(final String guid) throws AtlasServiceException { + return getTypeDefByGuid(guid, AtlasEnumDef.class); + } + + public AtlasStructDef getStructByName(final String name) throws AtlasServiceException { + return getTypeDefByName(name, AtlasStructDef.class); + } + + public AtlasStructDef getStructByGuid(final String guid) throws AtlasServiceException { + return getTypeDefByGuid(guid, AtlasStructDef.class); + } + + public AtlasClassificationDef getClassificationByName(final String name) throws AtlasServiceException { + return getTypeDefByName(name, AtlasClassificationDef.class); + } + + public AtlasClassificationDef getClassificationByGuid(final String guid) throws AtlasServiceException { + return getTypeDefByGuid(guid, AtlasClassificationDef.class); + } + + public AtlasEntityDef getEntityByName(final String name) throws AtlasServiceException { + return getTypeDefByName(name, AtlasEntityDef.class); + } + + public AtlasEntityDef getEntityByGuid(final String guid) throws AtlasServiceException { + return getTypeDefByGuid(guid, AtlasEntityDef.class); + } + + /** + * Bulk create APIs for all atlas type definitions, only new definitions will be created. + * Any changes to the existing definitions will be discarded + * + * @param typesDef A composite wrapper object with corresponding lists of the type definition + * @return A composite wrapper object with lists of type definitions that were successfully + * created + */ + public AtlasTypesDef createAtlasTypeDefs(final AtlasTypesDef typesDef) throws AtlasServiceException { + return callAPI(CREATE_ALL_TYPE_DEFS, AtlasType.toJson(typesDef), AtlasTypesDef.class); + } + + /** + * Bulk update API for all types, changes detected in the type definitions would be persisted + * + * @param typesDef A composite object that captures all type definition changes + * @return A composite object with lists of type definitions that were updated + */ + public AtlasTypesDef updateAtlasTypeDefs(final AtlasTypesDef typesDef) throws AtlasServiceException { + return callAPI(UPDATE_ALL_TYPE_DEFS, AtlasType.toJson(typesDef), AtlasTypesDef.class); + } + + /** + * Bulk delete API for all types + * + * @param typesDef A composite object that captures all types to be deleted + */ + public void deleteAtlasTypeDefs(final AtlasTypesDef typesDef) throws AtlasServiceException { + callAPI(DELETE_ALL_TYPE_DEFS, AtlasType.toJson(typesDef), AtlasTypesDef.class); + } + + private <T> T getTypeDefByName(final String name, Class<T> typeDefClass) throws AtlasServiceException { + String atlasPath = getAtlasPath(typeDefClass); + APIInfo apiInfo = new APIInfo(String.format(GET_BY_NAME_TEMPLATE, atlasPath, name), HttpMethod.GET, Response.Status.OK); + return callAPI(apiInfo, null, typeDefClass); + } + + private <T> T getTypeDefByGuid(final String guid, Class<T> typeDefClass) throws AtlasServiceException { + String atlasPath = getAtlasPath(typeDefClass); + APIInfo apiInfo = new APIInfo(String.format(GET_BY_GUID_TEMPLATE, atlasPath, guid), HttpMethod.GET, Response.Status.OK); + return callAPI(apiInfo, null, typeDefClass); + } + + private <T> String getAtlasPath(Class<T> typeDefClass) { + if (typeDefClass.isAssignableFrom(AtlasEnumDef.class)) { + return "enumdef"; + } else if (typeDefClass.isAssignableFrom(AtlasEntityDef.class)) { + return "entitydef"; + } else if (typeDefClass.isAssignableFrom(AtlasClassificationDef.class)) { + return "classificationdef"; + } else if (typeDefClass.isAssignableFrom(AtlasStructDef.class)) { + return "structdef"; + } + // Code should never reach this pion + return ""; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/758b3d4d/client/src/test/java/org/apache/atlas/AtlasClientTest.java ---------------------------------------------------------------------- diff --git a/client/src/test/java/org/apache/atlas/AtlasClientTest.java b/client/src/test/java/org/apache/atlas/AtlasClientTest.java index 77a387f..3a67689 100644 --- a/client/src/test/java/org/apache/atlas/AtlasClientTest.java +++ b/client/src/test/java/org/apache/atlas/AtlasClientTest.java @@ -21,6 +21,7 @@ import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; + import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.commons.configuration.Configuration; @@ -32,14 +33,15 @@ import org.mockito.MockitoAnnotations; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriBuilder; import java.net.ConnectException; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; import java.util.List; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriBuilder; + import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyString; @@ -94,6 +96,7 @@ public class AtlasClientTest { JSONObject jsonResponse = new JSONObject(new AtlasClient.EntityResult(Arrays.asList("id"), null, null).toString()); when(response.getEntity(String.class)).thenReturn(jsonResponse.toString()); + when(response.getLength()).thenReturn(jsonResponse.length()); String entityJson = InstanceSerialization.toJson(new Referenceable("type"), true); when(builder.method(anyString(), Matchers.<Class>any(), anyString())).thenReturn(response); @@ -157,9 +160,15 @@ public class AtlasClientTest { WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service); ClientResponse response = mock(ClientResponse.class); when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); - when(response.getEntity(String.class)).thenReturn("{\"Status\":\"Active\"}"); + String activeStatus = "{\"Status\":\"Active\"}"; + when(response.getEntity(String.class)).thenReturn(activeStatus); + when(response.getLength()).thenReturn(activeStatus.length()); when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).thenReturn(response); +// Fix after AtlasBaseClient +// atlasClient.setService(); + + String status = atlasClient.getAdminStatus(); assertEquals(status, "Active"); } @@ -212,10 +221,14 @@ public class AtlasClientTest { WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service); ClientResponse firstResponse = mock(ClientResponse.class); when(firstResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); - when(firstResponse.getEntity(String.class)).thenReturn("{\"Status\":\"PASSIVE\"}"); + String passiveStatus = "{\"Status\":\"PASSIVE\"}"; + when(firstResponse.getEntity(String.class)).thenReturn(passiveStatus); + when(firstResponse.getLength()).thenReturn(passiveStatus.length()); ClientResponse secondResponse = mock(ClientResponse.class); when(secondResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); - when(secondResponse.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}"); + String activeStatus = "{\"Status\":\"ACTIVE\"}"; + when(secondResponse.getEntity(String.class)).thenReturn(activeStatus); + when(secondResponse.getLength()).thenReturn(activeStatus.length()); when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)). thenReturn(firstResponse).thenReturn(firstResponse).thenReturn(firstResponse). thenReturn(secondResponse); @@ -239,7 +252,9 @@ public class AtlasClientTest { when(response.getEntity(String.class)).thenReturn("{\"Status\":\"BECOMING_ACTIVE\"}"); ClientResponse nextResponse = mock(ClientResponse.class); when(nextResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); - when(nextResponse.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}"); + String activeStatus = "{\"Status\":\"ACTIVE\"}"; + when(response.getEntity(String.class)).thenReturn(activeStatus); + when(response.getLength()).thenReturn(activeStatus.length()); when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)). thenReturn(response).thenReturn(response).thenReturn(nextResponse); @@ -262,13 +277,17 @@ public class AtlasClientTest { when(response.getEntity(String.class)).thenReturn("{\"Status\":\"BECOMING_ACTIVE\"}"); ClientResponse nextResponse = mock(ClientResponse.class); when(nextResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); - when(nextResponse.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}"); + String activeStatus = "{\"Status\":\"ACTIVE\"}"; + when(response.getEntity(String.class)).thenReturn(activeStatus); + when(response.getLength()).thenReturn(activeStatus.length()); when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)). thenThrow(new ClientHandlerException("Simulating connection exception")). thenReturn(response). thenReturn(nextResponse); AtlasClient atlasClient = new AtlasClient(service, configuration); + atlasClient.setService(service); + atlasClient.setConfiguration(configuration); String serviceURL = atlasClient.determineActiveServiceURL( new String[] {"http://localhost:31000","http://localhost:41000"}, @@ -313,7 +332,9 @@ public class AtlasClientTest { ClientResponse response = mock(ClientResponse.class); when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); - when(response.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}"); + String activeStatus = "{\"Status\":\"ACTIVE\"}"; + when(response.getEntity(String.class)).thenReturn(activeStatus); + when(response.getLength()).thenReturn(activeStatus.length()); when(builder.method(AtlasClient.API.LIST_TYPES.getMethod(), ClientResponse.class, null)). thenThrow(new ClientHandlerException("simulating exception in calling API", new ConnectException())). @@ -323,6 +344,9 @@ public class AtlasClientTest { AtlasClient atlasClient = getClientForTest("http://localhost:31000","http://localhost:41000"); + atlasClient.setService(service); + atlasClient.setConfiguration(configuration); + atlasClient.callAPIWithRetries(AtlasClient.API.LIST_TYPES, null, resourceCreator); verify(client).destroy(); @@ -343,7 +367,9 @@ public class AtlasClientTest { ClientResponse response = mock(ClientResponse.class); when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); - when(response.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}"); + String activeStatus = "{\"Status\":\"ACTIVE\"}"; + when(response.getEntity(String.class)).thenReturn(activeStatus); + when(response.getLength()).thenReturn(activeStatus.length()); when(builder.method(AtlasClient.API.LIST_TYPES.getMethod(), ClientResponse.class, null)). thenThrow(new ClientHandlerException("simulating exception in calling API", new ConnectException())). @@ -354,6 +380,9 @@ public class AtlasClientTest { AtlasClient atlasClient = getClientForTest("http://localhost:31000"); + atlasClient.setService(resourceObject); + atlasClient.setConfiguration(configuration); + atlasClient.callAPIWithRetries(AtlasClient.API.LIST_TYPES, null, resourceCreator); verify(client).destroy(); @@ -379,7 +408,9 @@ public class AtlasClientTest { ClientResponse response = mock(ClientResponse.class); when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); - when(response.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}"); + String activeStatus = "{\"Status\":\"ACTIVE\"}"; + when(response.getEntity(String.class)).thenReturn(activeStatus); + when(response.getLength()).thenReturn(activeStatus.length()); when(builder.method(AtlasClient.API.LIST_TYPES.getMethod(), ClientResponse.class, null)). thenThrow(new ClientHandlerException("simulating exception in calling API", new ConnectException())). @@ -389,9 +420,12 @@ public class AtlasClientTest { when(resourceCreator.createResource()).thenReturn(resourceObject); AtlasClient atlasClient = getClientForTest("http://localhost:31000","http://localhost:41000"); + atlasClient.setService(resourceObject); + atlasClient.setConfiguration(configuration); atlasClient.callAPIWithRetries(AtlasClient.API.LIST_TYPES, null, resourceCreator); + verify(client).destroy(); verify(client).resource(UriBuilder.fromUri("http://localhost:31000").build()); verify(client).resource(UriBuilder.fromUri("http://localhost:41000").build()); @@ -399,8 +433,9 @@ public class AtlasClientTest { private WebResource.Builder getBuilder(WebResource resourceObject) { WebResource.Builder builder = mock(WebResource.Builder.class); - when(resourceObject.accept(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder); - when(builder.type(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder); + when(resourceObject.path(anyString())).thenReturn(resourceObject); + when(resourceObject.accept(AtlasBaseClient.JSON_MEDIA_TYPE)).thenReturn(builder); + when(builder.type(AtlasBaseClient.JSON_MEDIA_TYPE)).thenReturn(builder); return builder; } @@ -413,7 +448,7 @@ public class AtlasClientTest { } private AtlasClient getClientForTest(final String... baseUrls) { - return new AtlasClient(null, null, baseUrls) { + return new AtlasClient((UserGroupInformation)null, (String)null, baseUrls) { boolean firstCall = true; @Override protected String determineActiveServiceURL(String[] baseUrls, Client client) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/758b3d4d/webapp/src/main/java/org/apache/atlas/web/resources/TypesResource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/TypesResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/TypesResource.java index ace0d14..1eafc7d 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/TypesResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/TypesResource.java @@ -19,6 +19,7 @@ package org.apache.atlas.web.resources; import com.sun.jersey.api.client.ClientResponse; + import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.services.MetadataService;
