ATLAS-571 Modify Atlas client for necessary changes in context of HA (yhemanth via sumasai)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/98f4d40a Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/98f4d40a Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/98f4d40a Branch: refs/heads/master Commit: 98f4d40a17668c8ee65b58fcbcb6d09ea6a682e5 Parents: c1d4e7c Author: Suma Shivaprasad <[email protected]> Authored: Mon Apr 4 17:15:46 2016 -0700 Committer: Suma Shivaprasad <[email protected]> Committed: Mon Apr 4 17:15:46 2016 -0700 ---------------------------------------------------------------------- .../atlas/hive/bridge/HiveMetaStoreBridge.java | 10 +- .../main/java/org/apache/atlas/AtlasClient.java | 445 ++++++++++++++----- .../org/apache/atlas/AtlasServerEnsemble.java | 52 +++ .../java/org/apache/atlas/ResourceCreator.java | 28 ++ .../atlas/security/SecurityProperties.java | 42 -- .../java/org/apache/atlas/AtlasClientTest.java | 323 ++++++++++++-- common/pom.xml | 5 + .../org/apache/atlas/ha/HAConfiguration.java | 173 +++++++ .../atlas/security/SecurityProperties.java | 46 ++ .../apache/atlas/ha/HAConfigurationTest.java | 67 +++ release-log.txt | 1 + .../apache/atlas/ha/AtlasServerIdSelector.java | 82 ++++ .../org/apache/atlas/ha/HAConfiguration.java | 196 -------- .../atlas/ha/AtlasServerIdSelectorTest.java | 68 +++ .../apache/atlas/ha/HAConfigurationTest.java | 90 ---- .../org/apache/atlas/examples/QuickStart.java | 3 +- .../service/ActiveInstanceElectorService.java | 3 +- .../atlas/web/service/CuratorFactory.java | 2 +- .../security/NegativeSSLAndKerberosTest.java | 2 +- .../atlas/web/security/SSLAndKerberosTest.java | 2 +- .../org/apache/atlas/web/security/SSLTest.java | 2 +- 21 files changed, 1179 insertions(+), 463 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java index 50a5311..3a802d7 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -96,7 +96,15 @@ public class HiveMetaStoreBridge { UserGroupInformation ugi) throws Exception { this(hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME), Hive.get(hiveConf), - new AtlasClient(atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL), ugi, doAsUser)); + atlasConf, doAsUser, ugi); + } + + HiveMetaStoreBridge(String clusterName, Hive hiveClient, + Configuration atlasConf, String doAsUser, UserGroupInformation ugi) { + this.clusterName = clusterName; + this.hiveClient = hiveClient; + String baseUrls = atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL); + this.atlasClient = new AtlasClient(ugi, doAsUser, baseUrls.split(",")); } HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/client/src/main/java/org/apache/atlas/AtlasClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java index 0bb5264..21f21eb 100755 --- a/client/src/main/java/org/apache/atlas/AtlasClient.java +++ b/client/src/main/java/org/apache/atlas/AtlasClient.java @@ -18,6 +18,7 @@ package org.apache.atlas; +import com.google.common.annotations.VisibleForTesting; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientResponse; @@ -42,6 +43,8 @@ import javax.ws.rs.HttpMethod; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriBuilder; +import java.io.IOException; +import java.net.ConnectException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -91,23 +94,66 @@ public class AtlasClient { public static final String JSON_MEDIA_TYPE = MediaType.APPLICATION_JSON + "; charset=UTF-8"; public static final String UNKNOWN_STATUS = "Unknown status"; - private WebResource service; + public static final String ATLAS_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries"; + // Setting the default value based on testing failovers while client code like quickstart is running. + public static final int DEFAULT_NUM_RETRIES = 4; + public static final String ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms"; + // Setting the default value based on testing failovers while client code like quickstart is running. + // With number of retries, this gives a total time of about 20s for the server to start. + public static final int DEFAULT_SLEEP_BETWEEN_RETRIES_MS = 5000; - protected AtlasClient() { - //do nothing. For LocalAtlasClient - } + private WebResource service; + private AtlasClientContext atlasClientContext; + private Configuration configuration; + /** + * Create a new AtlasClient. + * + * @param baseUrl The URL of the Atlas server to connect to. + */ public AtlasClient(String baseUrl) { this(baseUrl, null, null); } + /** + * Create a new Atlas Client. + * @param baseUrl The URL of the Atlas server to connect to. + * @param ugi The {@link UserGroupInformation} of logged in user. + * @param doAsUser The user on whose behalf queries will be executed. + */ public AtlasClient(String baseUrl, UserGroupInformation ugi, String doAsUser) { + initializeState(new String[] {baseUrl}, ugi, doAsUser); + } + + /** + * Create a new Atlas client. + * @param ugi The {@link UserGroupInformation} of logged in user, can be null in unsecure mode. + * @param doAsUser The user on whose behalf queries will be executed, can be null in unsecure mode. + * @param baseUrls A list of URLs that point to an ensemble of Atlas servers working in + * High Availability mode. The client will automatically determine the + * active instance on startup and also when there is a scenario of + * failover. + */ + public AtlasClient(UserGroupInformation ugi, String doAsUser, String... baseUrls) { + initializeState(baseUrls, ugi, doAsUser); + } + + private void initializeState(String[] baseUrls, UserGroupInformation ugi, String doAsUser) { + configuration = getClientProperties(); + Client client = getClient(configuration, ugi, doAsUser); + String activeServiceUrl = determineActiveServiceURL(baseUrls, client); + atlasClientContext = new AtlasClientContext(baseUrls, client, ugi, doAsUser); + service = client.resource(UriBuilder.fromUri(activeServiceUrl).build()); + } + + @VisibleForTesting + protected Client getClient(Configuration configuration, UserGroupInformation ugi, String doAsUser) { DefaultClientConfig config = new DefaultClientConfig(); Configuration clientConfig = null; int readTimeout = 60000; int connectTimeout = 60000; try { - clientConfig = getClientProperties(); + clientConfig = configuration; if (clientConfig.getBoolean(TLS_ENABLED, false)) { // create an SSL properties configuration if one doesn't exist. SSLFactory expects a file, so forced // to create a @@ -124,26 +170,109 @@ public class AtlasClient { SecureClientUtils.getClientConnectionHandler(config, clientConfig, doAsUser, ugi); Client client = new Client(handler, config); - client.resource(UriBuilder.fromUri(baseUrl).build()); client.setReadTimeout(readTimeout); client.setConnectTimeout(connectTimeout); + return client; + } - service = client.resource(UriBuilder.fromUri(baseUrl).build()); + @VisibleForTesting + protected String determineActiveServiceURL(String[] baseUrls, Client client) { + if (baseUrls.length == 0) { + throw new IllegalArgumentException("Base URLs cannot be null or empty"); + } + String baseUrl; + AtlasServerEnsemble atlasServerEnsemble = new AtlasServerEnsemble(baseUrls); + if (atlasServerEnsemble.hasSingleInstance()) { + baseUrl = atlasServerEnsemble.firstURL(); + LOG.info("Client has only one service URL, will use that for all actions: {}", baseUrl); + return baseUrl; + } else { + try { + baseUrl = selectActiveServerAddress(client, atlasServerEnsemble); + } catch (AtlasServiceException e) { + LOG.error("None of the passed URLs are active: {}", atlasServerEnsemble, e); + throw new IllegalArgumentException("None of the passed URLs are active " + atlasServerEnsemble, e); + } + } + return baseUrl; + } + + private String selectActiveServerAddress(Client client, AtlasServerEnsemble serverEnsemble) + throws AtlasServiceException { + List<String> serverInstances = serverEnsemble.getMembers(); + String activeServerAddress = null; + for (String serverInstance : serverInstances) { + LOG.info("Trying with address {}", serverInstance); + activeServerAddress = getAddressIfActive(client, serverInstance); + if (activeServerAddress != null) { + LOG.info("Found service {} as active service.", serverInstance); + break; + } + } + if (activeServerAddress != null) + return activeServerAddress; + else + throw new AtlasServiceException(API.STATUS, new RuntimeException("Could not find any active instance")); } - // for testing - AtlasClient(WebResource service) { + private String getAddressIfActive(Client client, String serverInstance) { + String activeServerAddress = null; + for (int i = 0; i < getNumberOfRetries(); i++) { + try { + WebResource service = client.resource(UriBuilder.fromUri(serverInstance).build()); + String adminStatus = getAdminStatus(service); + if (adminStatus.equals("ACTIVE")) { + activeServerAddress = serverInstance; + break; + } else { + LOG.info("Service {} is not active.. will retry.", serverInstance); + } + } catch (Exception e) { + LOG.error("Could not get status from service {} after {} tries.", serverInstance, i, e); + } + sleepBetweenRetries(); + LOG.warn("Service {} is not active.", serverInstance); + } + return activeServerAddress; + } + + private void sleepBetweenRetries(){ + try { + Thread.sleep(getSleepBetweenRetriesMs()); + } catch (InterruptedException e) { + LOG.error("Interrupted from sleeping between retries.", e); + } + } + + private int getSleepBetweenRetriesMs() { + return configuration.getInt(ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY, DEFAULT_SLEEP_BETWEEN_RETRIES_MS); + } + + private int getNumberOfRetries() { + return configuration.getInt(ATLAS_CLIENT_HA_RETRIES_KEY, DEFAULT_NUM_RETRIES); + } + + @VisibleForTesting + AtlasClient(WebResource service, Configuration configuration) { this.service = service; + this.configuration = configuration; } - protected Configuration getClientProperties() throws AtlasException { - return ApplicationProperties.get(); + protected Configuration getClientProperties() { + try { + if (configuration == null) { + configuration = ApplicationProperties.get(); + } + } catch (AtlasException e) { + LOG.error("Exception while loading configuration.", e); + } + return configuration; } public boolean isServerReady() throws AtlasServiceException { WebResource resource = getResource(API.VERSION); try { - callAPIWithResource(API.VERSION, resource); + callAPIWithResource(API.VERSION, resource, null); return true; } catch (ClientHandlerException che) { return false; @@ -164,9 +293,31 @@ public class AtlasClient { * @throws AtlasServiceException if there is a HTTP error. */ public String getAdminStatus() throws AtlasServiceException { + return getAdminStatus(service); + } + + private void handleClientHandlerException(ClientHandlerException che) { + if (isRetryableException(che)) { + atlasClientContext.getClient().destroy(); + LOG.warn("Destroyed current context while handling ClientHandlerEception."); + LOG.warn("Will retry and create new context."); + sleepBetweenRetries(); + initializeState(atlasClientContext.getBaseUrls(), atlasClientContext.getUgi(), + atlasClientContext.getDoAsUser()); + return; + } + throw che; + } + + private boolean isRetryableException(ClientHandlerException che) { + return che.getCause().getClass().equals(IOException.class) + || che.getCause().getClass().equals(ConnectException.class); + } + + private String getAdminStatus(WebResource service) throws AtlasServiceException { String result = UNKNOWN_STATUS; - WebResource resource = getResource(API.STATUS); - JSONObject response = callAPIWithResource(API.STATUS, resource); + WebResource resource = getResource(service, API.STATUS); + JSONObject response = callAPIWithResource(API.STATUS, resource, null); try { result = response.getString("Status"); } catch (JSONException e) { @@ -282,9 +433,8 @@ public class AtlasClient { } public String getType(String typeName) throws AtlasServiceException { - WebResource resource = getResource(API.GET_TYPE, typeName); try { - JSONObject response = callAPIWithResource(API.GET_TYPE, resource); + JSONObject response = callAPI(API.GET_TYPE, null, typeName);; return response.getString(DEFINITION); } catch (AtlasServiceException e) { if (Response.Status.NOT_FOUND.equals(e.getStatus())) { @@ -366,11 +516,37 @@ public class AtlasClient { * @param attribute property key * @param value property value */ - public void updateEntityAttribute(String guid, String attribute, String value) throws AtlasServiceException { - API api = API.UPDATE_ENTITY_PARTIAL; - WebResource resource = getResource(api, guid); - resource = resource.queryParam(ATTRIBUTE_NAME, attribute); - callAPIWithResource(api, resource, value); + public void updateEntityAttribute(final String guid, final String attribute, String value) throws AtlasServiceException { + callAPIWithRetries(API.UPDATE_ENTITY_PARTIAL, value, new ResourceCreator() { + @Override + public WebResource createResource() { + API api = API.UPDATE_ENTITY_PARTIAL; + WebResource resource = getResource(api, guid); + resource = resource.queryParam(ATTRIBUTE_NAME, attribute); + return resource; + } + }); + } + + @VisibleForTesting + JSONObject callAPIWithRetries(API api, Object requestObject, ResourceCreator resourceCreator) + throws AtlasServiceException { + for (int i = 0; i < getNumberOfRetries(); i++) { + WebResource resource = resourceCreator.createResource(); + try { + LOG.info("using resource {} for {} times", resource.getURI(), i); + JSONObject result = callAPIWithResource(api, resource, requestObject); + return result; + } catch (ClientHandlerException che) { + if (i==(getNumberOfRetries()-1)) { + throw che; + } + LOG.warn("Handled exception in calling api {}", api.getPath(), che); + LOG.warn("Exception's cause: {}", che.getCause().getClass()); + handleClientHandlerException(che); + } + } + throw new AtlasServiceException(api, new RuntimeException("Could not get response after retries.")); } /** @@ -392,15 +568,20 @@ public class AtlasClient { * @param uniqueAttributeValue Attribute Value that uniquely identifies the entity * @param entity entity definition */ - public String updateEntity(String entityType, String uniqueAttributeName, String uniqueAttributeValue, + public String updateEntity(final String entityType, final String uniqueAttributeName, final String uniqueAttributeValue, Referenceable entity) throws AtlasServiceException { - API api = API.UPDATE_ENTITY_PARTIAL; - WebResource resource = getResource(api, "qualifiedName"); - resource = resource.queryParam(TYPE, entityType); - resource = resource.queryParam(ATTRIBUTE_NAME, uniqueAttributeName); - resource = resource.queryParam(ATTRIBUTE_VALUE, uniqueAttributeValue); + final API api = API.UPDATE_ENTITY_PARTIAL; String entityJson = InstanceSerialization.toJson(entity, true); - JSONObject response = callAPIWithResource(api, resource, entityJson); + JSONObject response = callAPIWithRetries(api, entityJson, new ResourceCreator() { + @Override + public WebResource createResource() { + WebResource resource = getResource(api, "qualifiedName"); + resource = resource.queryParam(TYPE, entityType); + resource = resource.queryParam(ATTRIBUTE_NAME, uniqueAttributeName); + resource = resource.queryParam(ATTRIBUTE_VALUE, uniqueAttributeValue); + return resource; + } + }); try { return response.getString(GUID); } catch (JSONException e) { @@ -415,13 +596,18 @@ public class AtlasClient { * @return List of deleted entity guids * @throws AtlasServiceException */ - public List<String> deleteEntities(String ... guids) throws AtlasServiceException { - API api = API.DELETE_ENTITIES; - WebResource resource = getResource(api); - for (String guid : guids) { - resource = resource.queryParam(GUID.toLowerCase(), guid); - } - JSONObject jsonResponse = callAPIWithResource(API.DELETE_ENTITIES, resource); + public List<String> deleteEntities(final String ... guids) throws AtlasServiceException { + JSONObject jsonResponse = callAPIWithRetries(API.DELETE_ENTITIES, null, new ResourceCreator() { + @Override + public WebResource createResource() { + API api = API.DELETE_ENTITIES; + WebResource resource = getResource(api); + for (String guid : guids) { + resource = resource.queryParam(GUID.toLowerCase(), guid); + } + return resource; + } + }); return extractResults(jsonResponse, GUID); } @@ -457,12 +643,18 @@ public class AtlasClient { * @return result object * @throws AtlasServiceException */ - public Referenceable getEntity(String entityType, String attribute, String value) throws AtlasServiceException { - WebResource resource = getResource(API.GET_ENTITY); - resource = resource.queryParam(TYPE, entityType); - resource = resource.queryParam(ATTRIBUTE_NAME, attribute); - resource = resource.queryParam(ATTRIBUTE_VALUE, value); - JSONObject jsonResponse = callAPIWithResource(API.GET_ENTITY, resource); + public Referenceable getEntity(final String entityType, final String attribute, final String value) + throws AtlasServiceException { + JSONObject jsonResponse = callAPIWithRetries(API.GET_ENTITY, null, new ResourceCreator() { + @Override + public WebResource createResource() { + WebResource resource = getResource(API.GET_ENTITY); + resource = resource.queryParam(TYPE, entityType); + resource = resource.queryParam(ATTRIBUTE_NAME, attribute); + resource = resource.queryParam(ATTRIBUTE_VALUE, value); + return resource; + } + }); try { String entityInstanceDefinition = jsonResponse.getString(AtlasClient.DEFINITION); return InstanceSerialization.fromJsonReferenceable(entityInstanceDefinition, true); @@ -477,10 +669,15 @@ public class AtlasClient { * @return * @throws AtlasServiceException */ - public List<String> listEntities(String entityType) throws AtlasServiceException { - WebResource resource = getResource(API.LIST_ENTITIES); - resource = resource.queryParam(TYPE, entityType); - JSONObject jsonResponse = callAPIWithResource(API.LIST_ENTITIES, resource); + public List<String> listEntities(final String entityType) throws AtlasServiceException { + JSONObject jsonResponse = callAPIWithRetries(API.LIST_ENTITIES, null, new ResourceCreator() { + @Override + public WebResource createResource() { + WebResource resource = getResource(API.LIST_ENTITIES); + resource = resource.queryParam(TYPE, entityType); + return resource; + } + }); return extractResults(jsonResponse, AtlasClient.RESULTS); } @@ -508,10 +705,15 @@ public class AtlasClient { * @return * @throws AtlasServiceException */ - public JSONArray search(String searchQuery) throws AtlasServiceException { - WebResource resource = getResource(API.SEARCH); - resource = resource.queryParam(QUERY, searchQuery); - JSONObject result = callAPIWithResource(API.SEARCH, resource); + public JSONArray search(final String searchQuery) throws AtlasServiceException { + JSONObject result = callAPIWithRetries(API.SEARCH, null, new ResourceCreator() { + @Override + public WebResource createResource() { + WebResource resource = getResource(API.SEARCH); + resource = resource.queryParam(QUERY, searchQuery); + return resource; + } + }); try { return result.getJSONArray(RESULTS); } catch (JSONException e) { @@ -521,34 +723,21 @@ public class AtlasClient { } /** - * Search given type name, an attribute and its value. Uses search dsl - * @param typeName name of the entity type - * @param attributeName attribute name - * @param attributeValue attribute value - * @return result json object - * @throws AtlasServiceException - */ - public JSONArray rawSearch(String typeName, String attributeName, Object attributeValue) - throws AtlasServiceException { - // String gremlinQuery = String.format( - // "g.V.has(\"typeName\",\"%s\").and(_().has(\"%s.%s\", T.eq, \"%s\")).toList()", - // typeName, typeName, attributeName, attributeValue); - // return searchByGremlin(gremlinQuery); - String dslQuery = String.format("%s where %s = \"%s\"", typeName, attributeName, attributeValue); - return searchByDSL(dslQuery); - } - - /** * Search given query DSL * @param query DSL query * @return result json object * @throws AtlasServiceException */ - public JSONArray searchByDSL(String query) throws AtlasServiceException { + public JSONArray searchByDSL(final String query) throws AtlasServiceException { LOG.debug("DSL query: {}", query); - WebResource resource = getResource(API.SEARCH_DSL); - resource = resource.queryParam(QUERY, query); - JSONObject result = callAPIWithResource(API.SEARCH_DSL, resource); + JSONObject result = callAPIWithRetries(API.SEARCH_DSL, null, new ResourceCreator() { + @Override + public WebResource createResource() { + WebResource resource = getResource(API.SEARCH_DSL); + resource = resource.queryParam(QUERY, query); + return resource; + } + }); try { return result.getJSONArray(RESULTS); } catch (JSONException e) { @@ -562,11 +751,16 @@ public class AtlasClient { * @return result json object * @throws AtlasServiceException */ - public JSONArray searchByGremlin(String gremlinQuery) throws AtlasServiceException { + public JSONArray searchByGremlin(final String gremlinQuery) throws AtlasServiceException { LOG.debug("Gremlin query: " + gremlinQuery); - WebResource resource = getResource(API.SEARCH_GREMLIN); - resource = resource.queryParam(QUERY, gremlinQuery); - JSONObject result = callAPIWithResource(API.SEARCH_GREMLIN, resource); + JSONObject result = callAPIWithRetries(API.SEARCH_GREMLIN, null, new ResourceCreator() { + @Override + public WebResource createResource() { + WebResource resource = getResource(API.SEARCH_GREMLIN); + resource = resource.queryParam(QUERY, gremlinQuery); + return resource; + } + }); try { return result.getJSONArray(RESULTS); } catch (JSONException e) { @@ -580,10 +774,15 @@ public class AtlasClient { * @return result json object * @throws AtlasServiceException */ - public JSONObject searchByFullText(String query) throws AtlasServiceException { - WebResource resource = getResource(API.SEARCH_FULL_TEXT); - resource = resource.queryParam(QUERY, query); - return callAPIWithResource(API.SEARCH_FULL_TEXT, resource); + public JSONObject searchByFullText(final String query) throws AtlasServiceException { + return callAPIWithRetries(API.SEARCH_FULL_TEXT, null, new ResourceCreator() { + @Override + public WebResource createResource() { + WebResource resource = getResource(API.SEARCH_FULL_TEXT); + resource = resource.queryParam(QUERY, query); + return resource; + } + }); } public JSONObject getInputGraph(String datasetName) throws AtlasServiceException { @@ -604,15 +803,11 @@ public class AtlasClient { } } - public String getRequestId(JSONObject json) throws AtlasServiceException { - try { - return json.getString(REQUEST_ID); - } catch (JSONException e) { - throw new AtlasServiceException(e); - } + private WebResource getResource(API api, String... pathParams) { + return getResource(service, api, pathParams); } - private WebResource getResource(API api, String... pathParams) { + private WebResource getResource(WebResource service, API api, String... pathParams) { WebResource resource = service.path(api.getPath()); if (pathParams != null) { for (String pathParam : pathParams) { @@ -622,29 +817,75 @@ public class AtlasClient { return resource; } - private JSONObject callAPIWithResource(API api, WebResource resource) throws AtlasServiceException { - return callAPIWithResource(api, resource, null); - } - private JSONObject callAPIWithResource(API api, WebResource resource, Object requestObject) throws AtlasServiceException { - ClientResponse clientResponse = resource.accept(JSON_MEDIA_TYPE).type(JSON_MEDIA_TYPE) - .method(api.getMethod(), ClientResponse.class, requestObject); - - if (clientResponse.getStatus() == api.getExpectedStatus().getStatusCode()) { - String responseAsString = clientResponse.getEntity(String.class); - try { - return new JSONObject(responseAsString); - } catch (JSONException e) { - throw new AtlasServiceException(api, e); + ClientResponse clientResponse = null; + for (int i = 0; i < getNumberOfRetries(); i++) { + clientResponse = resource.accept(JSON_MEDIA_TYPE).type(JSON_MEDIA_TYPE) + .method(api.getMethod(), ClientResponse.class, requestObject); + + if (clientResponse.getStatus() == api.getExpectedStatus().getStatusCode()) { + String responseAsString = clientResponse.getEntity(String.class); + try { + return new JSONObject(responseAsString); + } catch (JSONException e) { + throw new AtlasServiceException(api, e); + } + } else if (clientResponse.getStatus() != ClientResponse.Status.SERVICE_UNAVAILABLE.getStatusCode()) { + break; + } else { + LOG.error("Got a service unavailable when calling: {}, will retry..", resource); + sleepBetweenRetries(); } } throw new AtlasServiceException(api, clientResponse); } - private JSONObject callAPI(API api, Object requestObject, String... pathParams) throws AtlasServiceException { - WebResource resource = getResource(api, pathParams); - return callAPIWithResource(api, resource, requestObject); + private JSONObject callAPI(final API api, Object requestObject, final String... pathParams) + throws AtlasServiceException { + return callAPIWithRetries(api, requestObject, new ResourceCreator() { + @Override + public WebResource createResource() { + return getResource(api, pathParams); + } + }); } + + /** + * A class to capture input state while creating the client. + * + * The information here will be reused when the client is re-initialized on switch-over + * in case of High Availability. + */ + private class AtlasClientContext { + private String[] baseUrls; + private Client client; + private final UserGroupInformation ugi; + private final String doAsUser; + + public AtlasClientContext(String[] baseUrls, Client client, UserGroupInformation ugi, String doAsUser) { + this.baseUrls = baseUrls; + this.client = client; + this.ugi = ugi; + this.doAsUser = doAsUser; + } + + public UserGroupInformation getUgi() { + return ugi; + } + + public String getDoAsUser() { + return doAsUser; + } + + public Client getClient() { + return client; + } + + public String[] getBaseUrls() { + return baseUrls; + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/client/src/main/java/org/apache/atlas/AtlasServerEnsemble.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/AtlasServerEnsemble.java b/client/src/main/java/org/apache/atlas/AtlasServerEnsemble.java new file mode 100644 index 0000000..96df6a3 --- /dev/null +++ b/client/src/main/java/org/apache/atlas/AtlasServerEnsemble.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang.StringUtils; +import scala.actors.threadpool.Arrays; + +import java.util.List; + +public class AtlasServerEnsemble { + + private final String[] urls; + + public AtlasServerEnsemble(String[] baseUrls) { + Preconditions.checkArgument((baseUrls!=null && baseUrls.length>0), + "List of baseURLs cannot be null or empty."); + for (String baseUrl : baseUrls) { + Preconditions.checkArgument(StringUtils.isNotEmpty(baseUrl), + "Base URL cannot be null or empty."); + } + urls = baseUrls; + } + + public boolean hasSingleInstance() { + return urls.length==1; + } + + public String firstURL() { + return urls[0]; + } + + public List<String> getMembers() { + return Arrays.asList(urls); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/client/src/main/java/org/apache/atlas/ResourceCreator.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/ResourceCreator.java b/client/src/main/java/org/apache/atlas/ResourceCreator.java new file mode 100644 index 0000000..53f92aa --- /dev/null +++ b/client/src/main/java/org/apache/atlas/ResourceCreator.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas; + +import com.sun.jersey.api.client.WebResource; + +/** + * An interface to capture the closure of how a WebResource is created. + */ +public interface ResourceCreator { + WebResource createResource(); +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/client/src/main/java/org/apache/atlas/security/SecurityProperties.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/security/SecurityProperties.java b/client/src/main/java/org/apache/atlas/security/SecurityProperties.java deleted file mode 100644 index b6c8c9b..0000000 --- a/client/src/main/java/org/apache/atlas/security/SecurityProperties.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.security; - -import java.util.Arrays; -import java.util.List; - -/** - * - */ -public interface SecurityProperties { - String TLS_ENABLED = "atlas.enableTLS"; - String KEYSTORE_FILE_KEY = "keystore.file"; - String DEFAULT_KEYSTORE_FILE_LOCATION = "target/atlas.keystore"; - String KEYSTORE_PASSWORD_KEY = "keystore.password"; - String TRUSTSTORE_FILE_KEY = "truststore.file"; - String DEFATULT_TRUSTORE_FILE_LOCATION = "target/atlas.keystore"; - String TRUSTSTORE_PASSWORD_KEY = "truststore.password"; - String SERVER_CERT_PASSWORD_KEY = "password"; - String CLIENT_AUTH_KEY = "client.auth.enabled"; - String CERT_STORES_CREDENTIAL_PROVIDER_PATH = "cert.stores.credential.provider.path"; - String SSL_CLIENT_PROPERTIES = "ssl-client.xml"; - String BIND_ADDRESS = "atlas.server.bind.address"; - String ATLAS_SSL_EXCLUDE_CIPHER_SUITES = "atlas.ssl.exclude.cipher.suites"; - List<String> DEFAULT_CIPHER_SUITES = Arrays.asList(".*NULL.*", ".*RC4.*", ".*MD5.*",".*DES.*",".*DSS.*"); - -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/client/src/test/java/org/apache/atlas/AtlasClientTest.java ---------------------------------------------------------------------- diff --git a/client/src/test/java/org/apache/atlas/AtlasClientTest.java b/client/src/test/java/org/apache/atlas/AtlasClientTest.java index 943301b..8911bf5 100644 --- a/client/src/test/java/org/apache/atlas/AtlasClientTest.java +++ b/client/src/test/java/org/apache/atlas/AtlasClientTest.java @@ -17,28 +17,58 @@ package org.apache.atlas; +import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; +import org.apache.commons.configuration.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriBuilder; + +import java.net.ConnectException; +import java.net.URI; +import java.net.URISyntaxException; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; import static org.testng.Assert.fail; public class AtlasClientTest { + @Mock + private WebResource service; + + @Mock + private Configuration configuration; + + @Mock + private Client client; + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + } + @Test public void shouldVerifyServerIsReady() throws AtlasServiceException { - WebResource webResource = mock(WebResource.class); - AtlasClient atlasClient = new AtlasClient(webResource); + setupRetryParams(); - WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, webResource); + AtlasClient atlasClient = new AtlasClient(service, configuration); + + WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, service); ClientResponse response = mock(ClientResponse.class); when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); when(response.getEntity(String.class)).thenReturn("{\"Version\":\"version-rrelease\",\"Name\":\"apache-atlas\"," + @@ -49,19 +79,16 @@ public class AtlasClientTest { } private WebResource.Builder setupBuilder(AtlasClient.API api, WebResource webResource) { - WebResource adminVersionResource = mock(WebResource.class); - when(webResource.path(api.getPath())).thenReturn(adminVersionResource); - WebResource.Builder builder = mock(WebResource.Builder.class); - when(adminVersionResource.accept(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder); - when(builder.type(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder); + when(webResource.path(api.getPath())).thenReturn(service); + WebResource.Builder builder = getBuilder(service); return builder; } @Test public void shouldReturnFalseIfServerIsNotReady() throws AtlasServiceException { - WebResource webResource = mock(WebResource.class); - AtlasClient atlasClient = new AtlasClient(webResource); - WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, webResource); + setupRetryParams(); + AtlasClient atlasClient = new AtlasClient(service, configuration); + WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, service); when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenThrow( new ClientHandlerException()); assertFalse(atlasClient.isServerReady()); @@ -69,9 +96,9 @@ public class AtlasClientTest { @Test public void shouldReturnFalseIfServiceIsUnavailable() throws AtlasServiceException { - WebResource webResource = mock(WebResource.class); - AtlasClient atlasClient = new AtlasClient(webResource); - WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, webResource); + setupRetryParams(); + AtlasClient atlasClient = new AtlasClient(service, configuration); + WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, service); ClientResponse response = mock(ClientResponse.class); when(response.getStatus()).thenReturn(Response.Status.SERVICE_UNAVAILABLE.getStatusCode()); when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.SERVICE_UNAVAILABLE); @@ -83,9 +110,10 @@ public class AtlasClientTest { @Test(expectedExceptions = AtlasServiceException.class) public void shouldThrowErrorIfAnyResponseOtherThanServiceUnavailable() throws AtlasServiceException { - WebResource webResource = mock(WebResource.class); - AtlasClient atlasClient = new AtlasClient(webResource); - WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, webResource); + setupRetryParams(); + + AtlasClient atlasClient = new AtlasClient(service, configuration); + WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, service); ClientResponse response = mock(ClientResponse.class); when(response.getStatus()).thenReturn(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.INTERNAL_SERVER_ERROR); @@ -98,10 +126,11 @@ public class AtlasClientTest { @Test public void shouldGetAdminStatus() throws AtlasServiceException { - WebResource webResource = mock(WebResource.class); - AtlasClient atlasClient = new AtlasClient(webResource); + setupRetryParams(); + + AtlasClient atlasClient = new AtlasClient(service, configuration); - WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, webResource); + WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service); ClientResponse response = mock(ClientResponse.class); when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); when(response.getEntity(String.class)).thenReturn("{\"Status\":\"Active\"}"); @@ -113,10 +142,11 @@ public class AtlasClientTest { @Test(expectedExceptions = AtlasServiceException.class) public void shouldReturnStatusAsUnknownOnException() throws AtlasServiceException { - WebResource webResource = mock(WebResource.class); - AtlasClient atlasClient = new AtlasClient(webResource); + setupRetryParams(); + + AtlasClient atlasClient = new AtlasClient(service, configuration); - WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, webResource); + WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service); ClientResponse response = mock(ClientResponse.class); when(response.getStatus()).thenReturn(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.INTERNAL_SERVER_ERROR); @@ -128,10 +158,10 @@ public class AtlasClientTest { @Test public void shouldReturnStatusAsUnknownIfJSONIsInvalid() throws AtlasServiceException { - WebResource webResource = mock(WebResource.class); - AtlasClient atlasClient = new AtlasClient(webResource); + setupRetryParams(); + AtlasClient atlasClient = new AtlasClient(service, configuration); - WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, webResource); + WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service); ClientResponse response = mock(ClientResponse.class); when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); when(response.getEntity(String.class)).thenReturn("{\"status\":\"Active\"}"); @@ -140,4 +170,245 @@ public class AtlasClientTest { String status = atlasClient.getAdminStatus(); assertEquals(status, AtlasClient.UNKNOWN_STATUS); } + + @Test + public void shouldReturnBaseURLAsPassedInURL() { + AtlasClient atlasClient = new AtlasClient(service, configuration); + + String serviceURL = atlasClient.determineActiveServiceURL(new String[]{"http://localhost:21000"}, client); + assertEquals(serviceURL, "http://localhost:21000"); + } + + @Test + public void shouldSelectActiveAmongMultipleServersIfHAIsEnabled() { + setupRetryParams(); + + when(client.resource(UriBuilder.fromUri("http://localhost:31000").build())).thenReturn(service); + when(client.resource(UriBuilder.fromUri("http://localhost:41000").build())).thenReturn(service); + WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service); + ClientResponse firstResponse = mock(ClientResponse.class); + when(firstResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); + when(firstResponse.getEntity(String.class)).thenReturn("{\"Status\":\"PASSIVE\"}"); + ClientResponse secondResponse = mock(ClientResponse.class); + when(secondResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); + when(secondResponse.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}"); + when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)). + thenReturn(firstResponse).thenReturn(firstResponse).thenReturn(firstResponse). + thenReturn(secondResponse); + + AtlasClient atlasClient = new AtlasClient(service, configuration); + + String serviceURL = atlasClient.determineActiveServiceURL( + new String[]{"http://localhost:31000", "http://localhost:41000"}, + client); + assertEquals(serviceURL, "http://localhost:41000"); + } + + @Test + public void shouldRetryUntilServiceBecomesActive() { + setupRetryParams(); + + when(client.resource(UriBuilder.fromUri("http://localhost:31000").build())).thenReturn(service); + WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service); + ClientResponse response = mock(ClientResponse.class); + when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); + when(response.getEntity(String.class)).thenReturn("{\"Status\":\"BECOMING_ACTIVE\"}"); + ClientResponse nextResponse = mock(ClientResponse.class); + when(nextResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); + when(nextResponse.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}"); + when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)). + thenReturn(response).thenReturn(response).thenReturn(nextResponse); + + AtlasClient atlasClient = new AtlasClient(service, configuration); + + String serviceURL = atlasClient.determineActiveServiceURL( + new String[] {"http://localhost:31000","http://localhost:41000"}, + client); + assertEquals(serviceURL, "http://localhost:31000"); + } + + @Test + public void shouldRetryIfCannotConnectToServiceInitially() { + setupRetryParams(); + + when(client.resource(UriBuilder.fromUri("http://localhost:31000").build())).thenReturn(service); + WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service); + ClientResponse response = mock(ClientResponse.class); + when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); + when(response.getEntity(String.class)).thenReturn("{\"Status\":\"BECOMING_ACTIVE\"}"); + ClientResponse nextResponse = mock(ClientResponse.class); + when(nextResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); + when(nextResponse.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}"); + when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)). + thenThrow(new ClientHandlerException("Simulating connection exception")). + thenReturn(response). + thenReturn(nextResponse); + + AtlasClient atlasClient = new AtlasClient(service, configuration); + + String serviceURL = atlasClient.determineActiveServiceURL( + new String[] {"http://localhost:31000","http://localhost:41000"}, + client); + assertEquals(serviceURL, "http://localhost:31000"); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void shouldThrowExceptionIfActiveServerIsNotFound() { + setupRetryParams(); + + when(client.resource(UriBuilder.fromUri("http://localhost:31000").build())).thenReturn(service); + WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service); + ClientResponse response = mock(ClientResponse.class); + when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); + when(response.getEntity(String.class)).thenReturn("{\"Status\":\"BECOMING_ACTIVE\"}"); + when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)). + thenThrow(new ClientHandlerException("Simulating connection exception")). + thenReturn(response). + thenReturn(response); + + AtlasClient atlasClient = new AtlasClient(service, configuration); + + String serviceURL = atlasClient.determineActiveServiceURL( + new String[] {"http://localhost:31000","http://localhost:41000"}, + client); + assertNull(serviceURL); + } + + @Test + public void shouldRetryAPICallsOnClientHandlerException() throws AtlasServiceException, URISyntaxException { + setupRetryParams(); + + ResourceCreator resourceCreator = mock(ResourceCreator.class); + WebResource resourceObject = mock(WebResource.class); + when(resourceObject.getURI()). + thenReturn(new URI("http://localhost:31000/api/atlas/types")). + thenReturn(new URI("http://localhost:41000/api/atlas/types")). + thenReturn(new URI("http://localhost:41000/api/atlas/types")); + + WebResource.Builder builder = getBuilder(resourceObject); + + ClientResponse response = mock(ClientResponse.class); + when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); + when(response.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}"); + + when(builder.method(AtlasClient.API.LIST_TYPES.getMethod(), ClientResponse.class, null)). + thenThrow(new ClientHandlerException("simulating exception in calling API", new ConnectException())). + thenReturn(response); + + when(resourceCreator.createResource()).thenReturn(resourceObject); + + AtlasClient atlasClient = getClientForTest("http://localhost:31000","http://localhost:41000"); + + atlasClient.callAPIWithRetries(AtlasClient.API.LIST_TYPES, null, resourceCreator); + + verify(client).destroy(); + verify(client).resource(UriBuilder.fromUri("http://localhost:31000").build()); + verify(client).resource(UriBuilder.fromUri("http://localhost:41000").build()); + } + + @Test + public void shouldRetryWithSameClientIfSingleAddressIsUsed() throws URISyntaxException, AtlasServiceException { + setupRetryParams(); + + ResourceCreator resourceCreator = mock(ResourceCreator.class); + WebResource resourceObject = mock(WebResource.class); + when(resourceObject.getURI()). + thenReturn(new URI("http://localhost:31000/api/atlas/types")); + + WebResource.Builder builder = getBuilder(resourceObject); + + ClientResponse response = mock(ClientResponse.class); + when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); + when(response.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}"); + + when(builder.method(AtlasClient.API.LIST_TYPES.getMethod(), ClientResponse.class, null)). + thenThrow(new ClientHandlerException("simulating exception in calling API", new ConnectException())). + thenReturn(response); + + when(resourceCreator.createResource()).thenReturn(resourceObject); + + AtlasClient atlasClient = getClientForTest("http://localhost:31000"); + + atlasClient.callAPIWithRetries(AtlasClient.API.LIST_TYPES, null, resourceCreator); + + verify(client).destroy(); + verify(client, times(2)).resource(UriBuilder.fromUri("http://localhost:31000").build()); + } + + @Test + public void shouldRetryAPICallsOnServiceUnavailable() throws AtlasServiceException, URISyntaxException { + setupRetryParams(); + + ResourceCreator resourceCreator = mock(ResourceCreator.class); + WebResource resourceObject = mock(WebResource.class); + when(resourceObject.getURI()). + thenReturn(new URI("http://localhost:31000/api/atlas/types")). + thenReturn(new URI("http://localhost:41000/api/atlas/types")). + thenReturn(new URI("http://localhost:41000/api/atlas/types")); + + WebResource.Builder builder = getBuilder(resourceObject); + + ClientResponse firstResponse = mock(ClientResponse.class); + when(firstResponse.getStatus()).thenReturn(Response.Status.SERVICE_UNAVAILABLE.getStatusCode()); + when(firstResponse.getClientResponseStatus()).thenReturn(ClientResponse.Status.SERVICE_UNAVAILABLE); + + ClientResponse response = mock(ClientResponse.class); + when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); + when(response.getEntity(String.class)).thenReturn("{\"Status\":\"ACTIVE\"}"); + + when(builder.method(AtlasClient.API.LIST_TYPES.getMethod(), ClientResponse.class, null)). + thenThrow(new ClientHandlerException("simulating exception in calling API", new ConnectException())). + thenReturn(firstResponse). + thenReturn(response); + + when(resourceCreator.createResource()).thenReturn(resourceObject); + + AtlasClient atlasClient = getClientForTest("http://localhost:31000","http://localhost:41000"); + + atlasClient.callAPIWithRetries(AtlasClient.API.LIST_TYPES, null, resourceCreator); + + verify(client).destroy(); + verify(client).resource(UriBuilder.fromUri("http://localhost:31000").build()); + verify(client).resource(UriBuilder.fromUri("http://localhost:41000").build()); + } + + private WebResource.Builder getBuilder(WebResource resourceObject) { + WebResource.Builder builder = mock(WebResource.Builder.class); + when(resourceObject.accept(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder); + when(builder.type(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder); + return builder; + } + + private void setupRetryParams() { + when(configuration.getInt(AtlasClient.ATLAS_CLIENT_HA_RETRIES_KEY, AtlasClient.DEFAULT_NUM_RETRIES)). + thenReturn(3); + when(configuration.getInt(AtlasClient.ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY, + AtlasClient.DEFAULT_SLEEP_BETWEEN_RETRIES_MS)). + thenReturn(1); + } + + private AtlasClient getClientForTest(final String... baseUrls) { + return new AtlasClient(null, null, baseUrls) { + boolean firstCall = true; + @Override + protected String determineActiveServiceURL(String[] baseUrls, Client client) { + String returnUrl = baseUrls[0]; + if (baseUrls.length > 1 && !firstCall) { + returnUrl = baseUrls[1]; + } + firstCall = false; + return returnUrl; + } + + @Override + protected Configuration getClientProperties() { + return configuration; + } + + @Override + protected Client getClient(Configuration configuration, UserGroupInformation ugi, String doAsUser) { + return client; + } + }; + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/common/pom.xml ---------------------------------------------------------------------- diff --git a/common/pom.xml b/common/pom.xml index 1f3d96e..614b3f6 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -51,5 +51,10 @@ <artifactId>commons-configuration</artifactId> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/common/src/main/java/org/apache/atlas/ha/HAConfiguration.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/atlas/ha/HAConfiguration.java b/common/src/main/java/org/apache/atlas/ha/HAConfiguration.java new file mode 100644 index 0000000..2e86a19 --- /dev/null +++ b/common/src/main/java/org/apache/atlas/ha/HAConfiguration.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.ha; + +import org.apache.atlas.security.SecurityProperties; +import org.apache.commons.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * A wrapper for getting configuration entries related to HighAvailability. + */ +public final class HAConfiguration { + + private HAConfiguration() { + } + + private static final Logger LOG = LoggerFactory.getLogger(HAConfiguration.class); + + public static final String ATLAS_SERVER_HA_PREFIX = "atlas.server.ha."; + public static final String ATLAS_SERVER_HA_ENABLED_KEY = ATLAS_SERVER_HA_PREFIX + "enabled"; + public static final String ATLAS_SERVER_ADDRESS_PREFIX = "atlas.server.address."; + public static final String ATLAS_SERVER_IDS = "atlas.server.ids"; + public static final String HA_ZOOKEEPER_CONNECT = ATLAS_SERVER_HA_PREFIX + "zookeeper.connect"; + public static final int DEFAULT_ZOOKEEPER_CONNECT_SLEEPTIME_MILLIS = 1000; + public static final String HA_ZOOKEEPER_RETRY_SLEEPTIME_MILLIS = + ATLAS_SERVER_HA_PREFIX + "zookeeper.retry.sleeptime.ms"; + public static final String HA_ZOOKEEPER_NUM_RETRIES = ATLAS_SERVER_HA_PREFIX + "zookeeper.num.retries"; + public static final int DEFAULT_ZOOKEEPER_CONNECT_NUM_RETRIES = 3; + public static final String HA_ZOOKEEPER_SESSION_TIMEOUT_MS = + ATLAS_SERVER_HA_PREFIX + "zookeeper.session.timeout.ms"; + public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MILLIS = 20000; + + /** + * Return whether HA is enabled or not. + * @param configuration underlying configuration instance + * @return + */ + public static boolean isHAEnabled(Configuration configuration) { + return configuration.getBoolean(ATLAS_SERVER_HA_ENABLED_KEY, false); + } + + /** + * Get the web server address that a server instance with the passed ID is bound to. + * + * This method uses the property {@link SecurityProperties#TLS_ENABLED} to determine whether + * the URL is http or https. + * + * @param configuration underlying configuration + * @param serverId serverId whose host:port property is picked to build the web server address. + * @return + */ + public static String getBoundAddressForId(Configuration configuration, String serverId) { + String hostPort = configuration.getString(ATLAS_SERVER_ADDRESS_PREFIX +serverId); + boolean isSecure = configuration.getBoolean(SecurityProperties.TLS_ENABLED); + String protocol = (isSecure) ? "https://" : "http://"; + return protocol + hostPort; + } + + public static List<String> getServerInstances(Configuration configuration) { + String[] serverIds = configuration.getStringArray(ATLAS_SERVER_IDS); + List<String> serverInstances = new ArrayList<>(serverIds.length); + for (String serverId : serverIds) { + serverInstances.add(getBoundAddressForId(configuration, serverId)); + } + return serverInstances; + } + + /** + * A collection of Zookeeper specific configuration that is used by High Availability code. + */ + public static class ZookeeperProperties { + private String connectString; + private int retriesSleepTimeMillis; + private int numRetries; + private int sessionTimeout; + + public ZookeeperProperties(String connectString, int retriesSleepTimeMillis, int numRetries, + int sessionTimeout) { + this.connectString = connectString; + this.retriesSleepTimeMillis = retriesSleepTimeMillis; + this.numRetries = numRetries; + this.sessionTimeout = sessionTimeout; + } + + public String getConnectString() { + return connectString; + } + + public int getRetriesSleepTimeMillis() { + return retriesSleepTimeMillis; + } + + public int getNumRetries() { + return numRetries; + } + + public int getSessionTimeout() { + return sessionTimeout; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ZookeeperProperties that = (ZookeeperProperties) o; + + if (retriesSleepTimeMillis != that.retriesSleepTimeMillis) { + return false; + } + + if (numRetries != that.numRetries) { + return false; + } + + if (sessionTimeout != that.sessionTimeout) { + return false; + } + + return !(connectString != null ? !connectString.equals(that.connectString) : that.connectString != null); + + } + + @Override + public int hashCode() { + int result = connectString != null ? connectString.hashCode() : 0; + result = 31 * result + retriesSleepTimeMillis; + result = 31 * result + numRetries; + result = 31 * result + sessionTimeout; + return result; + } + } + + public static ZookeeperProperties getZookeeperProperties(Configuration configuration) { + String zookeeperConnectString = configuration.getString("atlas.kafka.zookeeper.connect"); + if (configuration.containsKey(HA_ZOOKEEPER_CONNECT)) { + zookeeperConnectString = configuration.getString(HA_ZOOKEEPER_CONNECT); + } + + int retriesSleepTimeMillis = configuration.getInt(HA_ZOOKEEPER_RETRY_SLEEPTIME_MILLIS, + DEFAULT_ZOOKEEPER_CONNECT_SLEEPTIME_MILLIS); + + int numRetries = configuration.getInt(HA_ZOOKEEPER_NUM_RETRIES, DEFAULT_ZOOKEEPER_CONNECT_NUM_RETRIES); + + int sessionTimeout = configuration.getInt(HA_ZOOKEEPER_SESSION_TIMEOUT_MS, + DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MILLIS); + return new ZookeeperProperties(zookeeperConnectString, retriesSleepTimeMillis, numRetries, sessionTimeout); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/common/src/main/java/org/apache/atlas/security/SecurityProperties.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/atlas/security/SecurityProperties.java b/common/src/main/java/org/apache/atlas/security/SecurityProperties.java new file mode 100644 index 0000000..191d869 --- /dev/null +++ b/common/src/main/java/org/apache/atlas/security/SecurityProperties.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.security; + +import java.util.Arrays; +import java.util.List; + +/** + * + */ +public final class SecurityProperties { + + private SecurityProperties() { + } + + public static final String TLS_ENABLED = "atlas.enableTLS"; + public static final String KEYSTORE_FILE_KEY = "keystore.file"; + public static final String DEFAULT_KEYSTORE_FILE_LOCATION = "target/atlas.keystore"; + public static final String KEYSTORE_PASSWORD_KEY = "keystore.password"; + public static final String TRUSTSTORE_FILE_KEY = "truststore.file"; + public static final String DEFATULT_TRUSTORE_FILE_LOCATION = "target/atlas.keystore"; + public static final String TRUSTSTORE_PASSWORD_KEY = "truststore.password"; + public static final String SERVER_CERT_PASSWORD_KEY = "password"; + public static final String CLIENT_AUTH_KEY = "client.auth.enabled"; + public static final String CERT_STORES_CREDENTIAL_PROVIDER_PATH = "cert.stores.credential.provider.path"; + public static final String SSL_CLIENT_PROPERTIES = "ssl-client.xml"; + public static final String BIND_ADDRESS = "atlas.server.bind.address"; + public static final String ATLAS_SSL_EXCLUDE_CIPHER_SUITES = "atlas.ssl.exclude.cipher.suites"; + public static final List<String> DEFAULT_CIPHER_SUITES = Arrays.asList( + ".*NULL.*", ".*RC4.*", ".*MD5.*", ".*DES.*", ".*DSS.*"); +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/common/src/test/java/org/apache/atlas/ha/HAConfigurationTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/atlas/ha/HAConfigurationTest.java b/common/src/test/java/org/apache/atlas/ha/HAConfigurationTest.java new file mode 100644 index 0000000..8f0b9c5 --- /dev/null +++ b/common/src/test/java/org/apache/atlas/ha/HAConfigurationTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.ha; + +import org.apache.atlas.AtlasConstants; +import org.apache.atlas.security.SecurityProperties; +import org.apache.commons.configuration.Configuration; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.List; + +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +public class HAConfigurationTest { + + @Mock + private Configuration configuration; + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + System.setProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT, AtlasConstants.DEFAULT_APP_PORT_STR); + } + + @Test + public void testShouldReturnHTTPSBoundAddress() { + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21443"); + when(configuration.getBoolean(SecurityProperties.TLS_ENABLED)).thenReturn(true); + + String address = HAConfiguration.getBoundAddressForId(configuration, "id1"); + + assertEquals(address, "https://127.0.0.1:21443"); + } + + @Test + public void testShouldReturnListOfAddressesInConfig() { + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1", "id2"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id2")).thenReturn("127.0.0.1:31000"); + + List<String> serverInstances = HAConfiguration.getServerInstances(configuration); + assertEquals(serverInstances.size(), 2); + assertTrue(serverInstances.contains("http://127.0.0.1:21000")); + assertTrue(serverInstances.contains("http://127.0.0.1:31000")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 586a49e..10fcdbb 100644 --- a/release-log.txt +++ b/release-log.txt @@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ALL CHANGES: +ATLAS-571 Modify Atlas client for necessary changes in context of HA (yhemanth via sumasai) ATLAS-620 Disable hbase based entity audit (shwethags) ATLAS-618 Fix assembly for hdfs-module (sumasai via yhemanth) ATLAS-573 Inherited attributes disappear from entities after server restart (dkantor via sumasai) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/server-api/src/main/java/org/apache/atlas/ha/AtlasServerIdSelector.java ---------------------------------------------------------------------- diff --git a/server-api/src/main/java/org/apache/atlas/ha/AtlasServerIdSelector.java b/server-api/src/main/java/org/apache/atlas/ha/AtlasServerIdSelector.java new file mode 100644 index 0000000..f3d36a7 --- /dev/null +++ b/server-api/src/main/java/org/apache/atlas/ha/AtlasServerIdSelector.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.ha; + +import org.apache.atlas.AtlasConstants; +import org.apache.atlas.AtlasException; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.net.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; + +public class AtlasServerIdSelector { + + private static final Logger LOG = LoggerFactory.getLogger(AtlasServerIdSelector.class); + + /** + * Return the ID corresponding to this Atlas instance. + * + * The match is done by looking for an ID configured in {@link HAConfiguration#ATLAS_SERVER_IDS} key + * that has a host:port entry for the key {@link HAConfiguration#ATLAS_SERVER_ADDRESS_PREFIX}+ID where + * the host is a local IP address and port is set in the system property + * {@link AtlasConstants#SYSTEM_PROPERTY_APP_PORT}. + * + * @param configuration + * @return + * @throws AtlasException if no ID is found that maps to a local IP Address or port + */ + public static String selectServerId(Configuration configuration) throws AtlasException { + // ids are already trimmed by this method + String[] ids = configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS); + String matchingServerId = null; + int appPort = Integer.parseInt(System.getProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT)); + for (String id : ids) { + String hostPort = configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +id); + if (!StringUtils.isEmpty(hostPort)) { + InetSocketAddress socketAddress; + try { + socketAddress = NetUtils.createSocketAddr(hostPort); + } catch (Exception e) { + LOG.warn("Exception while trying to get socket address for " + hostPort, e); + continue; + } + if (!socketAddress.isUnresolved() + && NetUtils.isLocalAddress(socketAddress.getAddress()) + && appPort == socketAddress.getPort()) { + LOG.info("Found matched server id " + id + " with host port: " + hostPort); + matchingServerId = id; + break; + } + } else { + LOG.info("Could not find matching address entry for id: " + id); + } + } + if (matchingServerId == null) { + String msg = String.format("Could not find server id for this instance. " + + "Unable to find IDs matching any local host and port binding among %s", + StringUtils.join(ids, ",")); + throw new AtlasException(msg); + } + return matchingServerId; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98f4d40a/server-api/src/main/java/org/apache/atlas/ha/HAConfiguration.java ---------------------------------------------------------------------- diff --git a/server-api/src/main/java/org/apache/atlas/ha/HAConfiguration.java b/server-api/src/main/java/org/apache/atlas/ha/HAConfiguration.java deleted file mode 100644 index 06977c5..0000000 --- a/server-api/src/main/java/org/apache/atlas/ha/HAConfiguration.java +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.ha; - -import org.apache.atlas.AtlasConstants; -import org.apache.atlas.AtlasException; -import org.apache.atlas.security.SecurityProperties; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.net.NetUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetSocketAddress; - -/** - * A wrapper for getting configuration entries related to HighAvailability. - */ -public class HAConfiguration { - - private static final Logger LOG = LoggerFactory.getLogger(HAConfiguration.class); - - public static final String ATLAS_SERVER_HA_PREFIX = "atlas.server.ha"; - public static final String ATLAS_SERVER_HA_ENABLED_KEY = ATLAS_SERVER_HA_PREFIX + ".enabled"; - public static final String ATLAS_SERVER_ADDRESS_PREFIX = "atlas.server.address."; - public static final String ATLAS_SERVER_IDS = "atlas.server.ids"; - public static final String HA_ZOOKEEPER_CONNECT = ATLAS_SERVER_HA_PREFIX + ".zookeeper.connect"; - public static final int DEFAULT_ZOOKEEPER_CONNECT_SLEEPTIME_MILLIS = 1000; - public static final String HA_ZOOKEEPER_RETRY_SLEEPTIME_MILLIS = ATLAS_SERVER_HA_PREFIX + ".zookeeper.retry.sleeptime.ms"; - public static final String HA_ZOOKEEPER_NUM_RETRIES = ATLAS_SERVER_HA_PREFIX + ".zookeeper.num.retries"; - public static final int DEFAULT_ZOOKEEPER_CONNECT_NUM_RETRIES = 3; - public static final String HA_ZOOKEEPER_SESSION_TIMEOUT_MS = ATLAS_SERVER_HA_PREFIX + ".zookeeper.session.timeout.ms"; - public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MILLIS = 20000; - - /** - * Return whether HA is enabled or not. - * @param configuration underlying configuration instance - * @return - */ - public static boolean isHAEnabled(Configuration configuration) { - return configuration.getBoolean(ATLAS_SERVER_HA_ENABLED_KEY, false); - } - - /** - * Return the ID corresponding to this Atlas instance. - * - * The match is done by looking for an ID configured in {@link HAConfiguration#ATLAS_SERVER_IDS} key - * that has a host:port entry for the key {@link HAConfiguration#ATLAS_SERVER_ADDRESS_PREFIX}+ID where - * the host is a local IP address and port is set in the system property - * {@link AtlasConstants#SYSTEM_PROPERTY_APP_PORT}. - * - * @param configuration - * @return - * @throws AtlasException if no ID is found that maps to a local IP Address or port - */ - public static String getAtlasServerId(Configuration configuration) throws AtlasException { - // ids are already trimmed by this method - String[] ids = configuration.getStringArray(ATLAS_SERVER_IDS); - String matchingServerId = null; - int appPort = Integer.parseInt(System.getProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT)); - for (String id : ids) { - String hostPort = configuration.getString(ATLAS_SERVER_ADDRESS_PREFIX +id); - if (!StringUtils.isEmpty(hostPort)) { - InetSocketAddress socketAddress; - try { - socketAddress = NetUtils.createSocketAddr(hostPort); - } catch (Exception e) { - LOG.warn("Exception while trying to get socket address for " + hostPort, e); - continue; - } - if (!socketAddress.isUnresolved() - && NetUtils.isLocalAddress(socketAddress.getAddress()) - && appPort == socketAddress.getPort()) { - LOG.info("Found matched server id " + id + " with host port: " + hostPort); - matchingServerId = id; - break; - } - } else { - LOG.info("Could not find matching address entry for id: " + id); - } - } - if (matchingServerId == null) { - String msg = String.format("Could not find server id for this instance. " + - "Unable to find IDs matching any local host and port binding among %s", - StringUtils.join(ids, ",")); - throw new AtlasException(msg); - } - return matchingServerId; - } - - /** - * Get the web server address that a server instance with the passed ID is bound to. - * - * This method uses the property {@link SecurityProperties#TLS_ENABLED} to determine whether - * the URL is http or https. - * - * @param configuration underlying configuration - * @param serverId serverId whose host:port property is picked to build the web server address. - * @return - */ - public static String getBoundAddressForId(Configuration configuration, String serverId) { - String hostPort = configuration.getString(ATLAS_SERVER_ADDRESS_PREFIX +serverId); - boolean isSecure = configuration.getBoolean(SecurityProperties.TLS_ENABLED); - String protocol = (isSecure) ? "https://" : "http://"; - return protocol + hostPort; - } - - /** - * A collection of Zookeeper specific configuration that is used by High Availability code - */ - public static class ZookeeperProperties { - private String connectString; - private int retriesSleepTimeMillis; - private int numRetries; - private int sessionTimeout; - - public ZookeeperProperties(String connectString, int retriesSleepTimeMillis, int numRetries, - int sessionTimeout) { - this.connectString = connectString; - this.retriesSleepTimeMillis = retriesSleepTimeMillis; - this.numRetries = numRetries; - this.sessionTimeout = sessionTimeout; - } - - public String getConnectString() { - return connectString; - } - - public int getRetriesSleepTimeMillis() { - return retriesSleepTimeMillis; - } - - public int getNumRetries() { - return numRetries; - } - - public int getSessionTimeout() { - return sessionTimeout; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - ZookeeperProperties that = (ZookeeperProperties) o; - - if (retriesSleepTimeMillis != that.retriesSleepTimeMillis) return false; - if (numRetries != that.numRetries) return false; - if (sessionTimeout != that.sessionTimeout) return false; - return !(connectString != null ? !connectString.equals(that.connectString) : that.connectString != null); - - } - - @Override - public int hashCode() { - int result = connectString != null ? connectString.hashCode() : 0; - result = 31 * result + retriesSleepTimeMillis; - result = 31 * result + numRetries; - result = 31 * result + sessionTimeout; - return result; - } - } - - public static ZookeeperProperties getZookeeperProperties(Configuration configuration) { - String zookeeperConnectString = configuration.getString("atlas.kafka.zookeeper.connect"); - if (configuration.containsKey(HA_ZOOKEEPER_CONNECT)) { - zookeeperConnectString = configuration.getString(HA_ZOOKEEPER_CONNECT); - } - - int retriesSleepTimeMillis = configuration.getInt(HA_ZOOKEEPER_RETRY_SLEEPTIME_MILLIS, - DEFAULT_ZOOKEEPER_CONNECT_SLEEPTIME_MILLIS); - - int numRetries = configuration.getInt(HA_ZOOKEEPER_NUM_RETRIES, DEFAULT_ZOOKEEPER_CONNECT_NUM_RETRIES); - - int sessionTimeout = configuration.getInt(HA_ZOOKEEPER_SESSION_TIMEOUT_MS, - DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MILLIS); - return new ZookeeperProperties(zookeeperConnectString, retriesSleepTimeMillis, numRetries, sessionTimeout); - } -}
