http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/source/atlas/TagAtlasSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/source/atlas/TagAtlasSource.java b/tagsync/src/main/java/org/apache/ranger/source/atlas/TagAtlasSource.java deleted file mode 100644 index d9142c7..0000000 --- a/tagsync/src/main/java/org/apache/ranger/source/atlas/TagAtlasSource.java +++ /dev/null @@ -1,588 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ranger.source.atlas; - -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; - -import com.google.inject.Guice; -import com.google.inject.Injector; - -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; -import org.apache.atlas.typesystem.EntityImpl; -import org.apache.atlas.typesystem.IdImpl; -import org.apache.atlas.typesystem.TraitImpl; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.atlas.notification.NotificationModule; -import org.apache.atlas.notification.entity.EntityNotification; -import org.apache.atlas.notification.entity.EntityNotificationConsumer; -import org.apache.atlas.notification.entity.EntityNotificationConsumerProvider; -import org.apache.atlas.typesystem.api.Entity; -import org.apache.atlas.typesystem.api.Trait; -import org.apache.ranger.admin.client.datatype.RESTResponse; -import org.apache.ranger.model.TagSink; -import org.apache.ranger.model.TagSource; -import org.apache.ranger.plugin.util.RangerRESTClient; -import org.apache.ranger.plugin.util.RangerRESTUtils; -import org.apache.ranger.plugin.util.ServiceTags; -import org.apache.ranger.process.TagSyncConfig; - -import java.io.IOException; -import java.lang.reflect.Type; -import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -public class TagAtlasSource implements TagSource { - private static final Log LOG = LogFactory.getLog(TagAtlasSource.class); - - - private final Map<String, Entity> entities = new LinkedHashMap<>(); - private TagSink tagSink; - private Properties properties; - private ConsumerRunnable consumerTask; - - @Override - public boolean initialize(Properties properties) { - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagAtlasSource.initialize()"); - } - - boolean ret = true; - - if (properties == null || MapUtils.isEmpty(properties)) { - LOG.error("No properties specified for TagFileSource initialization"); - this.properties = new Properties(); - } else { - this.properties = properties; - } - - - NotificationModule notificationModule = new NotificationModule(); - - Injector injector = Guice.createInjector(notificationModule); - - EntityNotificationConsumerProvider consumerProvider = injector.getInstance(EntityNotificationConsumerProvider.class); - - consumerTask = new ConsumerRunnable(consumerProvider.get()); - - //ExecutorService executorService = Executors.newFixedThreadPool(1); - - //executorService.submit(new ConsumerRunnable(consumerProvider.get())); - - - if (LOG.isDebugEnabled()) { - LOG.debug("<== TagAtlasSource.initialize(), result=" + ret); - } - return ret; - } - - @Override - public void setTagSink(TagSink sink) { - if (sink == null) { - LOG.error("Sink is null!!!"); - } else { - this.tagSink = sink; - } - } - - @Override - public Thread start() { - Thread consumerThread = null; - if (consumerTask == null) { - LOG.error("No consumerTask!!!"); - } else { - consumerThread = new Thread(consumerTask); - consumerThread.setDaemon(true); - consumerThread.start(); - } - return consumerThread; - } - - @Override - public void updateSink() throws Exception { - } - - @Override - public boolean isChanged() { - return true; - } - - // ----- inner class : ConsumerRunnable ------------------------------------ - - private class ConsumerRunnable implements Runnable { - - private final EntityNotificationConsumer consumer; - - private ConsumerRunnable(EntityNotificationConsumer consumer) { - this.consumer = consumer; - } - - - // ----- Runnable -------------------------------------------------------- - - @Override - public void run() { - while (consumer.hasNext()) { - try { - EntityNotification notification = consumer.next(); - Entity entity = notification.getEntity(); - printNotification(notification); - ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification, properties); - if (serviceTags == null) { - LOG.error("Failed to map Atlas notification to ServiceTags structure"); - } else { - if (LOG.isDebugEnabled()) { - String serviceTagsJSON = new Gson().toJson(serviceTags); - LOG.debug("Atlas notification mapped to serviceTags=" + serviceTagsJSON); - } - - try { - tagSink.uploadServiceTags(serviceTags); - } catch (Exception exception) { - LOG.error("uploadServiceTags() failed..", exception); - } - } - } catch(Exception e){ - LOG.error("Exception encountered when processing notification:", e); - } - } - } - - public void printNotification(EntityNotification notification) { - Entity entity = notification.getEntity(); - if (LOG.isDebugEnabled()) { - LOG.debug("Notification-Type: " + notification.getOperationType().name()); - LOG.debug("Entity-Id: " + entity.getId().getGuid()); - LOG.debug("Entity-Type: " + entity.getTypeName()); - - LOG.debug("----------- Entity Values ----------"); - - - for (Map.Entry<String, Object> entry : entity.getValues().entrySet()) { - LOG.debug(" Name:" + entry.getKey()); - Object value = entry.getValue(); - LOG.debug(" Value:" + value); - } - - LOG.debug("----------- Entity Traits ----------"); - - - for (Map.Entry<String, ? extends Trait> entry : entity.getTraits().entrySet()) { - LOG.debug(" Trait-Name:" + entry.getKey()); - Trait trait = entry.getValue(); - LOG.debug(" Trait-Type:" + trait.getTypeName()); - Map<String, Object> traitValues = trait.getValues(); - for (Map.Entry<String, Object> valueEntry : traitValues.entrySet()) { - LOG.debug(" Trait-Value-Name:" + valueEntry.getKey()); - LOG.debug(" Trait-Value:" + valueEntry.getValue()); - } - } - - } - } - - } - - public void printAllEntities() { - try { - new AtlasUtility().getAllEntities(); - } - catch(java.io.IOException ioException) { - LOG.error("Caught IOException while retrieving Atlas Entities:", ioException); - } - } - - // update the set of entities with current from Atlas - public void refreshAllEntities() { - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagAtlasSource.refreshAllEntities()"); - } - AtlasUtility atlasUtility = new AtlasUtility(); - - try { - entities.putAll(atlasUtility.getAllEntities()); - } catch (IOException e) { - LOG.error("getAllEntities() failed", e); - } - if (LOG.isDebugEnabled()) { - LOG.debug("<== TagAtlasSource.refreshAllEntities()"); - } - } - - // Inner class AtlasUtil - - /** - * Atlas utility. - */ - @SuppressWarnings("unchecked") - private class AtlasUtility { - - /** - * Atlas APIs - */ - public static final String API_ATLAS_TYPES = "api/atlas/types"; - public static final String API_ATLAS_ENTITIES = "api/atlas/entities?type="; - public static final String API_ATLAS_ENTITY = "api/atlas/entities/"; - public static final String API_ATLAS_TYPE = "api/atlas/types/"; - - /** - * API Response Attributes - */ - public static final String RESULTS_ATTRIBUTE = "results"; - public static final String DEFINITION_ATTRIBUTE = "definition"; - public static final String VALUES_ATTRIBUTE = "values"; - public static final String TRAITS_ATTRIBUTE = "traits"; - public static final String TYPE_NAME_ATTRIBUTE = "typeName"; - public static final String TRAIT_TYPES_ATTRIBUTE = "traitTypes"; - public static final String SUPER_TYPES_ATTRIBUTE = "superTypes"; - public static final String ATTRIBUTE_DEFINITIONS_ATTRIBUTE = "attributeDefinitions"; - public static final String NAME_ATTRIBUTE = "name"; - - private Type mapType = new TypeToken<Map<String, Object>>(){}.getType(); - - private RangerRESTClient restClient; - - - // ----- Constructors ------------------------------------------------------ - - /** - * Construct an AtlasUtility - * - */ - public AtlasUtility() { - - String url = TagSyncConfig.getAtlasEndpoint(properties); - String sslConfigFileName = TagSyncConfig.getAtlasSslConfigFileName(properties); - - - if(LOG.isDebugEnabled()) { - LOG.debug("Initializing RangerRestClient with (url=" + url + ", sslConfigFileName" + sslConfigFileName + ")"); - } - - restClient = new RangerRESTClient(url, sslConfigFileName); - - if(LOG.isDebugEnabled()) { - LOG.debug("Initialized RangerRestClient with (url=" + url + ", sslConfigFileName=" + sslConfigFileName + ")"); - } - } - - - // ----- AtlasUtility ------------------------------------------------------ - - /** - * Get all of the entities defined in Atlas. - * - * @return a mapping of GUIDs to Atlas entities - * - * @throws IOException if there is an error communicating with Atlas - */ - public Map<String, Entity> getAllEntities() throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagAtlasSource.getAllEntities()"); - } - Map<String, Entity> entities = new LinkedHashMap<>(); - - Map<String, Object> typesResponse = atlasAPI(API_ATLAS_TYPES); - - List<String> types = getAttribute(typesResponse, RESULTS_ATTRIBUTE, List.class); - - for (String type : types) { - - Map<String, Object> entitiesResponse = atlasAPI(API_ATLAS_ENTITIES + type); - - List<String> guids = getAttribute(entitiesResponse, RESULTS_ATTRIBUTE, List.class); - - for (String guid : guids) { - - if (StringUtils.isNotBlank(guid)) { - - Map<Trait, Map<String, ? extends Trait>> traitSuperTypes = new HashMap<>(); - - Map<String, Object> entityResponse = atlasAPI(API_ATLAS_ENTITY + guid); - - if (entityResponse.containsKey(DEFINITION_ATTRIBUTE)) { - String definitionJSON = getAttribute(entityResponse, DEFINITION_ATTRIBUTE, String.class); - - LOG.info("{"); - LOG.info(" \"entity-id\":" + guid + ","); - LOG.info(" \"entity-definition\":" + definitionJSON); - LOG.info("}"); - - Map<String, Object> definition = new Gson().fromJson(definitionJSON, mapType); - - Map<String, Object> values = getAttribute(definition, VALUES_ATTRIBUTE, Map.class); - Map<String, Object> traits = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class); - String typeName = getAttribute(definition, TYPE_NAME_ATTRIBUTE, String.class); - - LOG.info("Received entity(typeName=" + typeName + ", id=" + guid + ")"); - - - Map<String, TraitImpl> traitMap = new HashMap<>(); - - if (MapUtils.isNotEmpty(traits)) { - - LOG.info("Traits for entity(typeName=" + typeName + ", id=" + guid + ") ------ "); - - for (Map.Entry<String, Object> entry : traits.entrySet()) { - - Map<String, Object> trait = (Map<String, Object>) entry.getValue(); - - Map<String, Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class); - String traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class); - - Map<String, TraitImpl> superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues); - - TraitImpl trait1 = new TraitImpl(traitTypeName, traitValues, superTypes); - - traitSuperTypes.put(trait1, superTypes); - - traitMap.put(entry.getKey(), trait1); - - - LOG.info(" Trait(typeName=" + traitTypeName + ")"); - - } - } else { - LOG.info("No traits for entity(typeName=" + typeName + ", id=" + guid + ")"); - } - EntityImpl entity = new EntityImpl(new IdImpl(guid, 0), typeName, values, traitMap); - - showEntity(entity); - - entities.put(guid, entity); - - } - } - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagAtlasSource.getAllEntities()"); - } - return entities; - } - - - // ----- helper methods ---------------------------------------------------- - - private Map<String, Object> getTraitType(String traitName) - throws IOException { - - Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + traitName); - - if (typeResponse.containsKey(DEFINITION_ATTRIBUTE)) { - String definitionJSON = getAttribute(typeResponse, DEFINITION_ATTRIBUTE, String.class); - - Map<String, Object> definition = new Gson().fromJson(definitionJSON, mapType); - - List traitTypes = getAttribute(definition, TRAIT_TYPES_ATTRIBUTE, List.class); - - if (traitTypes.size() > 0) { - return (Map<String, Object>) traitTypes.get(0); - } - } - return null; - } - - private Map<String, TraitImpl> getTraitSuperTypes(Map<String, Object> traitType, Map<String, Object> values) - throws IOException { - - Map<String, TraitImpl> superTypes = new HashMap<>(); - - if (traitType != null) { - - List<String> superTypeNames = getAttribute(traitType, SUPER_TYPES_ATTRIBUTE, List.class); - - for (String superTypeName : superTypeNames) { - - Map<String, Object> superTraitType = getTraitType(superTypeName); - - if (superTraitType != null) { - List<Map<String, Object>> attributeDefinitions = (List) superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE); - - Map<String, Object> superTypeValues = new HashMap<>(); - for (Map<String, Object> attributeDefinition : attributeDefinitions) { - - String attributeName = attributeDefinition.get(NAME_ATTRIBUTE).toString(); - if (values.containsKey(attributeName)) { - superTypeValues.put(attributeName, values.get(attributeName)); - } - } - - superTypes.put(superTypeName, - //new TraitImpl(getTraitSuperTypes(superTraitType, superTypeValues), superTypeValues, superTypeName)); - new TraitImpl(superTypeName, superTypeValues, getTraitSuperTypes(superTraitType, superTypeValues))); - } - } - } - return superTypes; - } - - /* - private Map<String, Object> atlasAPI(String endpoint) throws IOException { - InputStream in = streamProvider.readFrom(atlasEndpoint + endpoint, "GET", (String) null, Collections.<String, String>emptyMap()); - return new Gson().fromJson(IOUtils.toString(in, "UTF-8"), mapType); - } - */ - - private Map<String, Object> atlasAPI(String endpoint) { - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagAtlasSource.atlasAPI(" + endpoint +")"); - } - // Create a REST client and perform a get on it - Map<String, Object> ret = new HashMap<String, Object>(); - - WebResource webResource = restClient.getResource(endpoint); - - ClientResponse response = webResource.accept(RangerRESTUtils.REST_MIME_TYPE_JSON).get(ClientResponse.class); - - if(response != null && response.getStatus() == 200) { - ret = response.getEntity(ret.getClass()); - } else { - LOG.error("Atlas REST call returned with response={" + response +"}"); - - RESTResponse resp = RESTResponse.fromClientResponse(response); - LOG.error("Error getting Atlas Entity. request=" + webResource.toString() - + ", response=" + resp.toString()); - } - if (LOG.isDebugEnabled()) { - LOG.debug("<== TagAtlasSource.atlasAPI(" + endpoint + ")"); - } - return ret; - } - - private <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) { - return type.cast(map.get(name)); - } - - - - public void showEntity(Entity entity) { - - LOG.debug("Entity-id :" + entity.getId()); - - LOG.debug("Type: " + entity.getTypeName()); - - LOG.debug("----- Values -----"); - - for (Map.Entry<String, Object> entry : entity.getValues().entrySet()) { - LOG.debug(" Name: " + entry.getKey() + ""); - Object value = entry.getValue(); - LOG.debug(" Value: " + getValue(value, entities.keySet())); - } - - LOG.debug("----- Traits -----"); - - for (String traitName : entity.getTraits().keySet()) { - LOG.debug(" Name:" + entity.getId() + ", trait=" + traitName + ">" + traitName); - } - - } - - public void showTrait(Entity entity, String traitId) { - - String[] traitNames = traitId.split(","); - - Trait trait = entity.getTraits().get(traitNames[0]); - - for (int i = 1; i < traitNames.length; ++i ) { - trait = trait.getSuperTypes().get(traitNames[i]); - } - - String typeName = trait.getTypeName(); - - LOG.debug("Trait " + typeName + " for Entity id=" + entity.getId()); - - LOG.debug("Type: " + typeName); - - LOG.debug("----- Values ------"); - - for (Map.Entry<String, Object> entry : trait.getValues().entrySet()) { - LOG.debug("Name:" + entry.getKey()); - Object value = entry.getValue(); - LOG.debug("Value:" + getValue(value, entities.keySet())); - } - - LOG.debug("Super Traits"); - - - for (String traitName : trait.getSuperTypes().keySet()) { - LOG.debug("Name=" + entity.getId() + "&trait=" + traitId + "," + traitName + ">" + traitName); - } - } - - // resolve the given value if necessary - private String getValue(Object value, Set<String> ids) { - if (value == null) { - return ""; - } - String idString = getIdValue(value, ids); - if (idString != null) { - return idString; - } - - idString = getIdListValue(value, ids); - if (idString != null) { - return idString; - } - - return value.toString(); - } - // get an id from the given value; return null if the value is not an id type - private String getIdValue(Object value, Set<String> ids) { - if (value instanceof Map) { - Map map = (Map) value; - if (map.size() == 3 && map.containsKey("id")){ - String id = map.get("id").toString(); - if (ids.contains(id)) { - return id; - } - } - } - return null; - } - // get an id list from the given value; return null if the value is not an id list type - private String getIdListValue(Object value, Set<String> ids) { - if (value instanceof List) { - List list = (List) value; - if (list.size() > 0) { - StringBuilder sb = new StringBuilder(); - for (Object o : list) { - String idString = getIdValue(o, ids); - if (idString == null) { - return value.toString(); - } - if (sb.length() > 0) { - sb.append(", "); - } - sb.append(idString); - } - return sb.toString(); - } - } - return null; - } - - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/source/file/TagFileSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/source/file/TagFileSource.java b/tagsync/src/main/java/org/apache/ranger/source/file/TagFileSource.java deleted file mode 100644 index 25083de..0000000 --- a/tagsync/src/main/java/org/apache/ranger/source/file/TagFileSource.java +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ranger.source.file; - -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.ranger.model.TagSink; -import org.apache.ranger.model.TagSource; -import org.apache.ranger.plugin.util.ServiceTags; -import org.apache.ranger.process.TagSyncConfig; - -import java.io.*; -import java.util.Date; -import java.util.Properties; - -public class TagFileSource implements TagSource, Runnable { - private static final Log LOG = LogFactory.getLog(TagFileSource.class); - - private String sourceFileName; - private long lastModifiedTimeInMillis = 0L; - - private Gson gson; - private TagSink tagSink; - private Properties properties; - - @Override - public boolean initialize(Properties properties) { - - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagFileSource.initialize()"); - } - - if (properties == null || MapUtils.isEmpty(properties)) { - LOG.error("No properties specified for TagFileSource initialization"); - this.properties = new Properties(); - } else { - this.properties = properties; - } - - boolean ret = true; - - if (ret) { - - sourceFileName = TagSyncConfig.getTagSourceFileName(properties); - - if (LOG.isDebugEnabled()) { - LOG.debug("Provided sourceFileName=" + sourceFileName); - } - - String realFileName = TagSyncConfig.getResourceFileName(sourceFileName); - if (realFileName != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Real sourceFileName=" + realFileName); - } - sourceFileName = realFileName; - } else { - LOG.error(sourceFileName + " is not a file or is not readable"); - ret = false; - } - } - - if (ret) { - try { - gson = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create(); - } catch (Throwable excp) { - LOG.fatal("failed to create GsonBuilder object", excp); - ret = false; - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("<== TagFileSource.initialize(): sourceFileName=" + sourceFileName + ", result=" + ret); - } - - return ret; - } - - @Override - public void setTagSink(TagSink sink) { - if (sink == null) { - LOG.error("Sink is null!!!"); - } else { - this.tagSink = sink; - } - } - - @Override - public Thread start() { - - Thread fileMonitoringThread = null; - - fileMonitoringThread = new Thread(this); - fileMonitoringThread.setDaemon(true); - fileMonitoringThread.start(); - - return fileMonitoringThread; - } - - @Override - public void run() { - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagFileSource.run()"); - } - long sleepTimeBetweenCycleInMillis = TagSyncConfig.getSleepTimeInMillisBetweenCycle(properties); - boolean shutdownFlag = false; - - while (!shutdownFlag) { - - try { - if (isChanged()) { - LOG.info("Begin: update tags from source==>sink"); - if (TagSyncConfig.isTagSyncEnabled(properties)) { - updateSink(); - LOG.info("End: update tags from source==>sink"); - } else { - LOG.info("Tag-sync is not enabled."); - } - } else { - LOG.debug("TagFileSource: no change found for synchronization."); - } - - LOG.debug("Sleeping for [" + sleepTimeBetweenCycleInMillis + "] milliSeconds"); - - Thread.sleep(sleepTimeBetweenCycleInMillis); - } - catch (InterruptedException e) { - LOG.error("Failed to wait for [" + sleepTimeBetweenCycleInMillis + "] milliseconds before attempting to synchronize tag information", e); - shutdownFlag = true; - } - catch (Throwable t) { - LOG.error("tag-sync thread got an error", t); - } - } - - LOG.info("Shutting down the Tag-file-source thread"); - - if (LOG.isDebugEnabled()) { - LOG.debug("<== TagFileSource.run()"); - } - } - - @Override - public void updateSink() throws Exception { - - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagFileSource.updateSink()"); - } - ServiceTags serviceTags = readFromFile(); - - if (serviceTags != null) { - tagSink.uploadServiceTags(serviceTags); - } else { - LOG.error("Could not read ServiceTags from file"); - } - if (LOG.isDebugEnabled()) { - LOG.debug("<== TagFileSource.updateSink()"); - } - } - - @Override - public boolean isChanged() { - - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagFileSource.isChanged()"); - } - boolean ret = false; - - long modificationTime = getModificationTime(); - - if (modificationTime > lastModifiedTimeInMillis) { - if (LOG.isDebugEnabled()) { - Date modifiedDate = new Date(modificationTime); - Date lastModifiedDate = new Date(lastModifiedTimeInMillis); - LOG.debug("File modified at " + modifiedDate + "last-modified at " + lastModifiedDate); - } - lastModifiedTimeInMillis = modificationTime; - ret = true; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== TagFileSource.isChanged(): result=" + ret); - } - return ret; - } - - private ServiceTags readFromFile() { - - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagFileSource.readFromFile(): sourceFileName=" + sourceFileName); - } - - ServiceTags ret = null; - - Reader reader = null; - try { - - reader = new InputStreamReader(TagSyncConfig.getFileInputStream(sourceFileName)); - - ret = gson.fromJson(reader, ServiceTags.class); - - } - catch (FileNotFoundException exception) { - LOG.warn("Tag-source file does not exist or not readble '" + sourceFileName + "'"); - } - catch (Exception excp) { - LOG.error("failed to load service-tags from Tag-source file " + sourceFileName, excp); - } - finally { - if (reader != null) { - try { - reader.close(); - } catch (Exception excp) { - LOG.error("error while closing opened Tag-source file " + sourceFileName, excp); - } - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== TagFileSource.readFromFile(): sourceFileName=" + sourceFileName); - } - - return ret; - } - - private long getModificationTime() { - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagFileSource.getLastModificationTime(): sourceFileName=" + sourceFileName); - } - long ret = 0L; - - File sourceFile = new File(sourceFileName); - - if (sourceFile.exists() && sourceFile.isFile() && sourceFile.canRead()) { - ret = sourceFile.lastModified(); - } else { - ret = new Date().getTime(); - } - if (LOG.isDebugEnabled()) { - LOG.debug("<== TagFileSource.lastModificationTime(): sourceFileName=" + sourceFileName + " result=" + new Date(ret)); - } - - return ret; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java new file mode 100644 index 0000000..6565159 --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.model; + +import org.apache.ranger.plugin.store.TagStore; +import org.apache.ranger.plugin.util.ServiceTags; + +import java.util.Properties; + + +public interface TagSink extends TagStore { + boolean initialize(Properties properties); + void uploadServiceTags(ServiceTags serviceTags) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java new file mode 100644 index 0000000..2df8036 --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.model; + +import java.util.Properties; + +public interface TagSource { + + boolean initialize(Properties properties); + + void setTagSink(TagSink sink); + + void updateSink() throws Exception; + + Thread start(); + + boolean isChanged(); + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java new file mode 100644 index 0000000..7fe6bdb --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.process; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; + +import java.io.*; +import java.net.URL; +import java.util.Properties; + +public class TagSyncConfig extends Configuration { + private static final Logger LOG = Logger.getLogger(TagSyncConfig.class) ; + + public static final String CONFIG_FILE = "ranger-tagsync-site.xml"; + + public static final String DEFAULT_CONFIG_FILE = "ranger-tagsync-default.xml"; + + public static final String TAGSYNC_ENABLED_PROP = "ranger.tagsync.enabled" ; + + public static final String TAGSYNC_LOGDIR_PROP = "ranger.tagsync.logdir" ; + + private static final String TAGSYNC_TAGADMIN_REST_URL_PROP = "ranger.tagsync.tagadmin.rest.url"; + + private static final String TAGSYNC_TAGADMIN_REST_SSL_CONFIG_FILE_PROP = "ranger.tagsync.tagadmin.rest.ssl.config.file"; + + private static final String TAGSYNC_TAGADMIN_SSL_BASICAUTH_USERNAME_PROP = "ranger.tagsync.tagadmin.basicauth.username"; + + private static final String TAGSYNC_TAGADMIN_SSL_BASICAUTH_PASSWORD_PROP = "ranger.tagsync.tagadmin.basicauth.password"; + + private static final String TAGSYNC_FILESOURCE_FILENAME_PROP = "ranger.tagsync.filesource.filename"; + + private static final String TAGSYNC_SLEEP_TIME_IN_MILLIS_BETWEEN_CYCLE_PROP = "ranger.tagsync.sleeptimeinmillisbetweensynccycle"; + + private static final String TAGSYNC_SOURCE_CLASS_PROP = "ranger.tagsync.source.impl.class"; + + private static final String TAGSYNC_SINK_CLASS_PROP = "ranger.tagsync.sink.impl.class"; + + private static final String TAGSYNC_ATLASSOURCE_ENDPOINT_PROP = "ranger.tagsync.atlassource.endpoint"; + + private static final String TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX = "ranger.tagsync.atlas."; + + private static final String TAGSYNC_SERVICENAME_MAPPER_PROP_SUFFIX = ".ranger.service"; + + private static final String TAGSYNC_DEFAULT_CLUSTERNAME_AND_COMPONENTNAME_SEPARATOR = "_"; + + private static volatile TagSyncConfig instance = null; + + public static TagSyncConfig getInstance() { + /* + TagSyncConfig ret = instance; + if (ret == null) { + synchronized(TagSyncConfig.class) { + if (ret == null) { + ret = instance = new TagSyncConfig(); + LOG.debug("TagSyncConfig = {" + ret + "}"); + } + } + } + */ + TagSyncConfig newConfig = new TagSyncConfig(); + return newConfig; + } + + public Properties getProperties() { + return getProps(); + } + + public static InputStream getFileInputStream(String path) throws FileNotFoundException { + + InputStream ret = null; + + File f = new File(path); + + if (f.exists() && f.isFile() && f.canRead()) { + ret = new FileInputStream(f); + } else { + ret = TagSyncConfig.class.getResourceAsStream(path); + + if (ret == null) { + if (! path.startsWith("/")) { + ret = TagSyncConfig.class.getResourceAsStream("/" + path); + } + } + + if (ret == null) { + ret = ClassLoader.getSystemClassLoader().getResourceAsStream(path) ; + if (ret == null) { + if (! path.startsWith("/")) { + ret = ClassLoader.getSystemResourceAsStream("/" + path); + } + } + } + } + + return ret; + } + + public static String getResourceFileName(String path) { + + String ret = null; + + if (StringUtils.isNotBlank(path)) { + + File f = new File(path); + + if (f.exists() && f.isFile() && f.canRead()) { + ret = path; + } else { + + URL fileURL = TagSyncConfig.class.getResource(path); + if (fileURL == null) { + if (!path.startsWith("/")) { + fileURL = TagSyncConfig.class.getResource("/" + path); + } + } + + if (fileURL == null) { + fileURL = ClassLoader.getSystemClassLoader().getResource(path); + if (fileURL == null) { + if (!path.startsWith("/")) { + fileURL = ClassLoader.getSystemClassLoader().getResource("/" + path); + } + } + } + + if (fileURL != null) { + try { + ret = fileURL.getFile(); + } catch (Exception exception) { + LOG.error(path + " is not a file", exception); + } + } else { + LOG.warn("URL not found for " + path + " or no privilege for reading file " + path); + } + } + } + + return ret; + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + + sb.append("DEFAULT_CONFIG_FILE=").append(DEFAULT_CONFIG_FILE).append(", ") + .append("CONFIG_FILE=").append(CONFIG_FILE).append("\n\n"); + + return sb.toString() + super.toString(); + } + + static public boolean isTagSyncEnabled(Properties prop) { + String val = prop.getProperty(TAGSYNC_ENABLED_PROP); + return !(val != null && val.trim().equalsIgnoreCase("falae")); + } + + static public String getTagSyncLogdir(Properties prop) { + String val = prop.getProperty(TAGSYNC_LOGDIR_PROP); + return val; + } + + static public long getSleepTimeInMillisBetweenCycle(Properties prop) { + String val = prop.getProperty(TAGSYNC_SLEEP_TIME_IN_MILLIS_BETWEEN_CYCLE_PROP); + return Long.valueOf(val); + } + + static public String getTagSourceClassName(Properties prop) { + String val = prop.getProperty(TAGSYNC_SOURCE_CLASS_PROP); + if (StringUtils.equalsIgnoreCase(val, "atlas")) { + return "org.apache.ranger.tagsync.source.atlas.TagAtlasSource"; + } else if (StringUtils.equalsIgnoreCase(val, "file")) { + return "org.apache.ranger.tagsync.source.file.TagFileSource"; + } else + return val; + } + + static public String getTagSinkClassName(Properties prop) { + String val = prop.getProperty(TAGSYNC_SINK_CLASS_PROP); + if (StringUtils.equalsIgnoreCase(val, "tagadmin")) { + return "org.apache.ranger.tagsync.sink.tagadmin.TagRESTSink"; + } else + return val; + } + + static public String getTagAdminRESTUrl(Properties prop) { + String val = prop.getProperty(TAGSYNC_TAGADMIN_REST_URL_PROP); + return val; + } + + static public String getTagAdminRESTSslConfigFile(Properties prop) { + String val = prop.getProperty(TAGSYNC_TAGADMIN_REST_SSL_CONFIG_FILE_PROP); + return val; + } + + static public String getTagAdminUserName(Properties prop) { + String val = prop.getProperty(TAGSYNC_TAGADMIN_SSL_BASICAUTH_USERNAME_PROP); + return val; + } + + static public String getTagAdminPassword(Properties prop) { + String val = prop.getProperty(TAGSYNC_TAGADMIN_SSL_BASICAUTH_PASSWORD_PROP); + return val; + } + + static public String getTagSourceFileName(Properties prop) { + String val = prop.getProperty(TAGSYNC_FILESOURCE_FILENAME_PROP); + return val; + } + + static public String getAtlasEndpoint(Properties prop) { + String val = prop.getProperty(TAGSYNC_ATLASSOURCE_ENDPOINT_PROP); + return val; + } + + static public String getAtlasSslConfigFileName(Properties prop) { + return ""; + } + + static public String getServiceName(String componentName, String instanceName, Properties prop) { + String propName = TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX + componentName + + ".instance." + instanceName + + TAGSYNC_SERVICENAME_MAPPER_PROP_SUFFIX; + String val = prop.getProperty(propName); + if (StringUtils.isBlank(val)) { + val = instanceName + TAGSYNC_DEFAULT_CLUSTERNAME_AND_COMPONENTNAME_SEPARATOR + componentName; + } + return val; + } + + private TagSyncConfig() { + super(false); + init() ; + } + + private void init() { + readConfigFile(DEFAULT_CONFIG_FILE); + readConfigFile(CONFIG_FILE); + } + + private void readConfigFile(String fileName) { + + if (StringUtils.isNotBlank(fileName)) { + String fName = getResourceFileName(fileName); + if (StringUtils.isBlank(fName)) { + LOG.warn("Cannot find configuration file " + fileName + " in the classpath"); + } else { + LOG.info("Loading configuration from " + fName); + addResource(fileName); + } + } else { + LOG.error("Configuration fileName is null"); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java new file mode 100644 index 0000000..0235581 --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.process; + +import org.apache.commons.collections.MapUtils; +import org.apache.log4j.Logger; +import org.apache.ranger.tagsync.model.TagSink; +import org.apache.ranger.tagsync.model.TagSource; + +import java.util.Map; +import java.util.Properties; + +public class TagSynchronizer implements Runnable { + + private static final Logger LOG = Logger.getLogger(TagSynchronizer.class); + + private final static int MAX_INIT_RETRIES = 5; + + private boolean shutdownFlag = false; + private TagSink tagSink = null; + private TagSource tagSource = null; + private Properties properties = null; + + + public static void main(String[] args) { + + TagSyncConfig config = TagSyncConfig.getInstance(); + Properties props = config.getProperties(); + + TagSynchronizer tagSynchronizer = new TagSynchronizer(props); + + tagSynchronizer.run(); + } + + public TagSynchronizer(Properties properties) { + if (properties == null || MapUtils.isEmpty(properties)) { + LOG.error("TagSynchronizer initialized with null properties!"); + this.properties = new Properties(); + } else { + this.properties = properties; + } + } + + public TagSink getTagSink() { + return tagSink; + } + + public TagSource getTagSource() { + return tagSource; + } + + @Override + public void run() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> TagSynchronizer.run()"); + } + try { + long sleepTimeBetweenCycleInMillis = TagSyncConfig.getSleepTimeInMillisBetweenCycle(properties); + + boolean initDone = initLoop(); + + if (initDone) { + + Thread tagSourceThread = tagSource.start(); + + if (tagSourceThread != null) { + while (!shutdownFlag) { + try { + LOG.debug("Sleeping for [" + sleepTimeBetweenCycleInMillis + "] milliSeconds"); + Thread.sleep(sleepTimeBetweenCycleInMillis); + } catch (InterruptedException e) { + LOG.error("Failed to wait for [" + sleepTimeBetweenCycleInMillis + "] milliseconds before attempting to synchronize tag information", e); + } + } + if (shutdownFlag) { + LOG.info("Interrupting tagSourceThread..."); + tagSourceThread.interrupt(); + try { + tagSourceThread.join(); + } catch (InterruptedException interruptedException) { + LOG.error("tagSourceThread.join() was interrupted"); + } + } + } else { + LOG.error("Could not start tagSource monitoring thread"); + } + } else { + LOG.error("Failed to initialize TagSynchonizer after " + MAX_INIT_RETRIES + " retries. Exiting thread"); + } + + } catch (Throwable t) { + LOG.error("tag-sync thread got an error", t); + } finally { + LOG.error("Shutting down the tag-sync thread"); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== TagSynchronizer.run()"); + } + } + + public boolean initLoop() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> TagSynchronizer.initLoop()"); + } + boolean ret = false; + + long sleepTimeBetweenCycleInMillis = TagSyncConfig.getSleepTimeInMillisBetweenCycle(properties); + + for (int initRetries = 0; initRetries < MAX_INIT_RETRIES && !ret; initRetries++) { + + printConfigurationProperties(); + + ret = init(); + + if (!ret) { + LOG.error("Failed to initialize TAG source/sink. Will retry after " + sleepTimeBetweenCycleInMillis + " milliseconds."); + try { + LOG.debug("Sleeping for [" + sleepTimeBetweenCycleInMillis + "] milliSeconds"); + Thread.sleep(sleepTimeBetweenCycleInMillis); + properties = TagSyncConfig.getInstance().getProperties(); + } catch (Exception e) { + LOG.error("Failed to wait for [" + sleepTimeBetweenCycleInMillis + "] milliseconds before attempting to initialize tag source/sink", e); + } + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("<== TagSynchronizer.initLoop()"); + } + return ret; + } + + public boolean init() { + + if (LOG.isDebugEnabled()) { + LOG.debug("==> TagSynchronizer.init()"); + } + boolean ret = false; + try { + LOG.info("Initializing TAG source and sink"); + // Initialize tagSink and tagSource + String tagSourceClassName = TagSyncConfig.getTagSourceClassName(properties); + String tagSinkClassName = TagSyncConfig.getTagSinkClassName(properties); + + if (LOG.isDebugEnabled()) { + LOG.debug("tagSourceClassName=" + tagSourceClassName + ", tagSinkClassName=" + tagSinkClassName); + } + + Class<TagSource> tagSourceClass = (Class<TagSource>) Class.forName(tagSourceClassName); + Class<TagSink> tagSinkClass = (Class<TagSink>) Class.forName(tagSinkClassName); + + tagSink = tagSinkClass.newInstance(); + tagSource = tagSourceClass.newInstance(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Created instance of " + tagSourceClassName + ", " + tagSinkClassName); + } + + ret = tagSink.initialize(properties) && tagSource.initialize(properties); + + tagSource.setTagSink(tagSink); + + LOG.info("Done initializing TAG source and sink"); + } catch (Throwable t) { + LOG.error("Failed to initialize TAG source/sink. Error details: ", t); + } + if (LOG.isDebugEnabled()) { + LOG.debug("<== TagSynchronizer.init(), result=" + ret); + } + + return ret; + } + + public void shutdown(String reason) { + LOG.info("Received shutdown(), reason=" + reason); + this.shutdownFlag = true; + } + + public void printConfigurationProperties() { + LOG.info("--------------------------------"); + LOG.info(""); + LOG.info("Ranger-TagSync Configuration: {\n"); + if (MapUtils.isNotEmpty(properties)) { + for (Map.Entry<Object, Object> entry : properties.entrySet()) { + LOG.info("\tProperty-Name:" + entry.getKey()); + LOG.info("\tProperty-Value:" + entry.getValue()); + LOG.info("\n"); + } + } + LOG.info("\n}"); + LOG.info(""); + LOG.info("--------------------------------"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java new file mode 100644 index 0000000..e1bcfbb --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagRESTSink.java @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.sink.tagadmin; + +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; + +import org.apache.commons.collections.MapUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.admin.client.datatype.RESTResponse; +import org.apache.ranger.tagsync.model.TagSink; +import org.apache.ranger.plugin.model.*; +import org.apache.ranger.plugin.store.PList; +import org.apache.ranger.plugin.store.ServiceStore; +import org.apache.ranger.plugin.util.RangerRESTClient; +import org.apache.ranger.plugin.util.SearchFilter; +import org.apache.ranger.plugin.util.ServiceTags; +import org.apache.ranger.tagsync.process.TagSyncConfig; + +import java.util.List; +import java.util.Map; +import java.util.Properties; + +public class TagRESTSink implements TagSink { + private static final Log LOG = LogFactory.getLog(TagRESTSink.class); + + private static final String REST_PREFIX = "/service"; + private static final String MODULE_PREFIX = "/tags"; + + private static final String REST_MIME_TYPE_JSON = "application/json" ; + private static final String REST_URL_TAGDEFS_RESOURCE = REST_PREFIX + MODULE_PREFIX + "/tagdefs/" ; + private static final String REST_URL_TAGDEF_RESOURCE = REST_PREFIX + MODULE_PREFIX + "/tagdef/" ; + private static final String REST_URL_SERVICERESOURCES_RESOURCE = REST_PREFIX + MODULE_PREFIX + "resources/" ; + private static final String REST_URL_SERVICERESOURCE_RESOURCE = REST_PREFIX + MODULE_PREFIX + "resource/" ; + private static final String REST_URL_TAGS_RESOURCE = REST_PREFIX + MODULE_PREFIX + "/tags/" ; + private static final String REST_URL_TAG_RESOURCE = REST_PREFIX + MODULE_PREFIX + "/tag/" ; + private static final String REST_URL_TAGRESOURCEMAP_IDS_RESOURCE = REST_PREFIX + MODULE_PREFIX + "/tagresourcemapids/"; + private static final String REST_URL_IMPORT_SERVICETAGS_RESOURCE = REST_PREFIX + MODULE_PREFIX + "/importservicetags/"; + + private RangerRESTClient tagRESTClient = null; + + @Override + public void init() {} + + @Override + public boolean initialize(Properties properties) { + if(LOG.isDebugEnabled()) { + LOG.debug("==> TagRESTSink.initialize()"); + } + + boolean ret = false; + + String restUrl = TagSyncConfig.getTagAdminRESTUrl(properties); + String sslConfigFile = TagSyncConfig.getTagAdminRESTSslConfigFile(properties); + String userName = TagSyncConfig.getTagAdminUserName(properties); + String password = TagSyncConfig.getTagAdminPassword(properties); + + if (LOG.isDebugEnabled()) { + LOG.debug("restUrl=" + restUrl); + LOG.debug("sslConfigFile=" + sslConfigFile); + LOG.debug("userName=" + userName); + LOG.debug("password=" + password); + } + tagRESTClient = new RangerRESTClient(restUrl, sslConfigFile); + if (tagRESTClient != null) { + tagRESTClient.setBasicAuthInfo(userName, password); + ret = true; + } else { + LOG.error("Could not create RangerRESTClient"); + } + + if(LOG.isDebugEnabled()) { + LOG.debug("<== TagRESTSink.initialize(), result=" + ret); + } + return ret; + } + + @Override + public void setServiceStore(ServiceStore svcStore) { + + } + + @Override + public RangerTagDef createTagDef(RangerTagDef tagDef) throws Exception { + if(LOG.isDebugEnabled()) { + LOG.debug("==> createTagDef(" + tagDef + ")"); + } + + RangerTagDef ret = null; + + WebResource webResource = createWebResource(REST_URL_TAGDEFS_RESOURCE); + ClientResponse response = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).post(ClientResponse.class, tagRESTClient.toJson(tagDef)); + + if(response != null && response.getStatus() == 200) { + ret = response.getEntity(RangerTagDef.class); + } else { + LOG.error("RangerAdmin REST call returned with response={" + response +"}"); + RESTResponse resp = RESTResponse.fromClientResponse(response); + + throw new Exception(resp.getMessage()); + } + + if(LOG.isDebugEnabled()) { + LOG.debug("<== createTagDef(" + tagDef + "): " + ret); + } + + return ret; + } + + @Override + public RangerTagDef updateTagDef(RangerTagDef TagDef) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public void deleteTagDefByName(String name) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public void deleteTagDef(Long id) throws Exception { + if(LOG.isDebugEnabled()) { + LOG.debug("==> deleteTagDef(" + id + ")"); + } + WebResource webResource = createWebResource(REST_URL_TAGDEF_RESOURCE + Long.toString(id)); + + ClientResponse response = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).delete(ClientResponse.class); + + if(response != null && response.getStatus() == 204) { + } else { + LOG.error("RangerAdmin REST call returned with response={" + response + "}"); + + RESTResponse resp = RESTResponse.fromClientResponse(response); + + throw new Exception(resp.getMessage()); + } + + if(LOG.isDebugEnabled()) { + LOG.debug("<== deleteTagDef(" + id + ")"); + } + } + + @Override + public RangerTagDef getTagDef(Long id) throws Exception { + throw new Exception("Not implemented"); + + } + + @Override + public RangerTagDef getTagDefByGuid(String guid) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public RangerTagDef getTagDefByName(String name) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public List<RangerTagDef> getTagDefs(SearchFilter filter) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public PList<RangerTagDef> getPaginatedTagDefs(SearchFilter filter) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public List<String> getTagTypes() throws Exception { + // TODO Auto-generated method stub + return null; + } + + + @Override + public RangerTag createTag(RangerTag tag) throws Exception { + if(LOG.isDebugEnabled()) { + LOG.debug("==> createTag(" + tag + ")"); + } + + RangerTag ret = null; + + WebResource webResource = createWebResource(REST_URL_TAGS_RESOURCE); + ClientResponse response = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).post(ClientResponse.class, tagRESTClient.toJson(tag)); + + if(response != null && response.getStatus() == 200) { + ret = response.getEntity(RangerTag.class); + } else { + LOG.error("RangerAdmin REST call returned with response={" + response +"}"); + RESTResponse resp = RESTResponse.fromClientResponse(response); + + throw new Exception(resp.getMessage()); + } + + if(LOG.isDebugEnabled()) { + LOG.debug("<== createTag(" + tag + "): " + ret); + } + + return ret; + } + + @Override + public RangerTag updateTag(RangerTag tag) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public void deleteTag(Long id) throws Exception { + if(LOG.isDebugEnabled()) { + LOG.debug("==> deleteTag(" + id + ")"); + } + WebResource webResource = createWebResource(REST_URL_TAG_RESOURCE + Long.toString(id)); + + ClientResponse response = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).delete(ClientResponse.class); + + if(response != null && response.getStatus() == 204) { + } else { + LOG.error("RangerAdmin REST call returned with response={" + response + "}"); + + RESTResponse resp = RESTResponse.fromClientResponse(response); + + throw new Exception(resp.getMessage()); + } + + if(LOG.isDebugEnabled()) { + LOG.debug("<== deleteTag(" + id + ")"); + } + } + + @Override + public RangerTag getTag(Long id) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public RangerTag getTagByGuid(String guid) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public List<RangerTag> getTagsByType(String name) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public List<Long> getTagIdsForResourceId(Long resourceId) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public List<RangerTag> getTagsForResourceId(Long resourceId) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public List<RangerTag> getTagsForResourceGuid(String resourceGuid) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public List<RangerTag> getTags(SearchFilter filter) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public PList<RangerTag> getPaginatedTags(SearchFilter filter) throws Exception { + throw new Exception("Not implemented"); + } + + + @Override + public RangerServiceResource createServiceResource(RangerServiceResource resource) throws Exception { + if(LOG.isDebugEnabled()) { + LOG.debug("==> createServiceResource(" + resource + ")"); + } + + RangerServiceResource ret = null; + + WebResource webResource = createWebResource(REST_URL_SERVICERESOURCES_RESOURCE); + ClientResponse response = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).post(ClientResponse.class, tagRESTClient.toJson(resource)); + + if(response != null && response.getStatus() == 200) { + ret = response.getEntity(RangerServiceResource.class); + } else { + LOG.error("RangerAdmin REST call returned with response={" + response +"}"); + + RESTResponse resp = RESTResponse.fromClientResponse(response); + + throw new Exception(resp.getMessage()); + } + + if(LOG.isDebugEnabled()) { + LOG.debug("<== createServiceResource(" + resource + "): " + ret); + } + + return ret; + } + + @Override + public RangerServiceResource updateServiceResource(RangerServiceResource resource) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public void deleteServiceResource(Long id) throws Exception { + if(LOG.isDebugEnabled()) { + LOG.debug("==> deleteServiceResource(" + id + ")"); + } + WebResource webResource = createWebResource(REST_URL_SERVICERESOURCE_RESOURCE + Long.toString(id)); + + ClientResponse response = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).delete(ClientResponse.class); + + if(response != null && response.getStatus() == 204) { + } else { + LOG.error("RangerAdmin REST call returned with response={" + response + "}"); + + RESTResponse resp = RESTResponse.fromClientResponse(response); + + throw new Exception(resp.getMessage()); + } + + if(LOG.isDebugEnabled()) { + LOG.debug("<== deleteServiceResource(" + id + ")"); + } + } + + @Override + public RangerServiceResource getServiceResource(Long id) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public RangerServiceResource getServiceResourceByGuid(String guid) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public List<RangerServiceResource> getServiceResourcesByService(String serviceName) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public RangerServiceResource getServiceResourceByResourceSignature(String resourceSignature) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public List<RangerServiceResource> getServiceResources(SearchFilter filter) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public PList<RangerServiceResource> getPaginatedServiceResources(SearchFilter filter) throws Exception { + throw new Exception("Not implemented"); + } + + + @Override + public RangerTagResourceMap createTagResourceMap(RangerTagResourceMap tagResourceMap) throws Exception { + if(LOG.isDebugEnabled()) { + LOG.debug("==> createTagResourceMap(" + tagResourceMap + ")"); + } + + RangerTagResourceMap ret = null; + + WebResource webResource = createWebResource(REST_URL_TAGRESOURCEMAP_IDS_RESOURCE) + .queryParam("tag-id", Long.toString(tagResourceMap.getTagId())) + .queryParam("resource-id", Long.toString(tagResourceMap.getResourceId())); + + ClientResponse response = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).post(ClientResponse.class); + + if(response != null && response.getStatus() == 200) { + ret = response.getEntity(RangerTagResourceMap.class); + } else { + LOG.error("RangerAdmin REST call returned with response={" + response +"}"); + + RESTResponse resp = RESTResponse.fromClientResponse(response); + + throw new Exception(resp.getMessage()); + } + + if(LOG.isDebugEnabled()) { + LOG.debug("<== createTagResourceMap(" + tagResourceMap + "): " + ret); + } + + return ret; + } + + @Override + public void deleteTagResourceMap(Long id) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public void uploadServiceTags(ServiceTags serviceTags) throws Exception { + if(LOG.isDebugEnabled()) { + LOG.debug("==> uploadServiceTags()"); + } + WebResource webResource = createWebResource(REST_URL_IMPORT_SERVICETAGS_RESOURCE); + + ClientResponse response = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).put(ClientResponse.class, tagRESTClient.toJson(serviceTags)); + + if(response != null && response.getStatus() == 204) { + } else { + LOG.error("RangerAdmin REST call returned with response={" + response + "}"); + + RESTResponse resp = RESTResponse.fromClientResponse(response); + + LOG.error("Upload of service-tags failed with message " + resp.getMessage()); + } + + if(LOG.isDebugEnabled()) { + LOG.debug("<== uploadServiceTags()"); + } + } + + @Override + public RangerTagResourceMap getTagResourceMap(Long id) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public RangerTagResourceMap getTagResourceMapByGuid(String guid) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public List<RangerTagResourceMap> getTagResourceMapsForTagId(Long tagId) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public List<RangerTagResourceMap> getTagResourceMapsForTagGuid(String tagGuid) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public List<RangerTagResourceMap> getTagResourceMapsForResourceId(Long resourceId) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public List<RangerTagResourceMap> getTagResourceMapsForResourceGuid(String resourceGuid) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public RangerTagResourceMap getTagResourceMapForTagAndResourceId(Long tagId, Long resourceId) throws Exception { + throw new Exception("Not implemented"); + } + + + @Override + public RangerTagResourceMap getTagResourceMapForTagAndResourceGuid(String tagGuid, String resourceGuid) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public List<RangerTagResourceMap> getTagResourceMaps(SearchFilter filter) throws Exception { + throw new Exception("Not implemented"); + } + + @Override + public PList<RangerTagResourceMap> getPaginatedTagResourceMaps(SearchFilter filter) throws Exception { + throw new Exception("Not implemented"); + } + + + @Override + public ServiceTags getServiceTagsIfUpdated(String serviceName, Long lastKnownVersion) throws Exception { + throw new Exception("Not implemented"); + } + + private WebResource createWebResource(String url) { + return createWebResource(url, null); + } + + private WebResource createWebResource(String url, SearchFilter filter) { + WebResource ret = tagRESTClient.getResource(url); + + if(filter != null && !MapUtils.isEmpty(filter.getParams())) { + for(Map.Entry<String, String> e : filter.getParams().entrySet()) { + String name = e.getKey(); + String value = e.getValue(); + + ret.queryParam(name, value); + } + } + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java new file mode 100644 index 0000000..973b7fb --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.source.atlas; + +import org.apache.atlas.notification.entity.EntityNotification; +import org.apache.atlas.typesystem.api.Entity; +import org.apache.atlas.typesystem.api.Trait; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.plugin.model.RangerPolicy; +import org.apache.ranger.plugin.model.RangerServiceResource; +import org.apache.ranger.plugin.model.RangerTag; +import org.apache.ranger.plugin.model.RangerTagDef; +import org.apache.ranger.plugin.util.ServiceTags; +import org.apache.ranger.tagsync.process.TagSyncConfig; + +import java.util.*; + +class AtlasNotificationMapper { + private static final Log LOG = LogFactory.getLog(AtlasNotificationMapper.class); + + public static final String ENTITY_TYPE_HIVE_DB = "hive_db"; + public static final String ENTITY_TYPE_HIVE_TABLE = "hive_table"; + public static final String ENTITY_TYPE_HIVE_COLUMN = "hive_column"; + + public static final String RANGER_TYPE_HIVE_DB = "database"; + public static final String RANGER_TYPE_HIVE_TABLE = "table"; + public static final String RANGER_TYPE_HIVE_COLUMN = "column"; + + public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + public static final String QUALIFIED_NAME_FORMAT_DELIMITER_STRING = "\\."; + + + private static Properties properties = null; + + public static ServiceTags processEntityNotification(EntityNotification entityNotification, Properties props) { + + ServiceTags ret = null; + properties = props; + + try { + if (isEntityMappable(entityNotification.getEntity())) { + ret = createServiceTags(entityNotification); + } else { + LOG.info("Ranger not interested in Entity Notification for entity-type " + entityNotification.getEntity().getTypeName()); + } + } catch (Exception exception) { + LOG.error("createServiceTags() failed!! ", exception); + } + return ret; + } + + static private boolean isEntityMappable(Entity entity) { + boolean ret = false; + + String entityTypeName = entity.getTypeName(); + + if (StringUtils.isNotBlank(entityTypeName)) { + if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_DB) || + StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_TABLE) || + StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_COLUMN)) { + ret = true; + } + } + return ret; + } + + static private ServiceTags createServiceTags(EntityNotification entityNotification) throws Exception { + + ServiceTags ret = null; + + EntityNotification.OperationType opType = entityNotification.getOperationType(); + Entity entity = entityNotification.getEntity(); + + String opName = entityNotification.getOperationType().name(); + switch (opType) { + case ENTITY_CREATED: { + ret = getServiceTags(entity, opType); + break; + } + case ENTITY_UPDATED: { + ret = handleEntityUpdate(entity); + break; + } + case TRAIT_ADDED: { + ret = getServiceTags(entity, opType); + break; + } + case TRAIT_DELETED: { + ret = handleTraitDelete(entity); + break; + } + default: + LOG.error("Unknown notification received. Will not be handled, notificationType=" + opName); + } + + return ret; + } + + static private ServiceTags getServiceTags(Entity entity, EntityNotification.OperationType opType) throws Exception { + ServiceTags ret = null; + + + List<RangerServiceResource> serviceResources = new ArrayList<RangerServiceResource>(); + + RangerServiceResource serviceResource = getServiceResource(entity, opType); + serviceResources.add(serviceResource); + + Map<Long, RangerTag> tags = getTags(entity); + + Map<Long, RangerTagDef> tagDefs = getTagDefs(tags, EntityNotification.OperationType.ENTITY_CREATED); + + Map<Long, List<Long>> resourceIdToTagIds = null; + + if (MapUtils.isNotEmpty(tags)) { + resourceIdToTagIds = new HashMap<Long, List<Long>>(); + + List<Long> tagList = new ArrayList<Long>(); + for (Map.Entry<Long, RangerTag> entry : tags.entrySet()) { + tagList.add(entry.getKey()); + } + resourceIdToTagIds.put(1L, tagList); + } + + ret = new ServiceTags(); + + ret.setOp(ServiceTags.OP_ADD_OR_UPDATE); + ret.setServiceName(serviceResource.getServiceName()); + ret.setServiceResources(serviceResources); + ret.setTagDefinitions(tagDefs); + ret.setTags(tags); + ret.setResourceToTagIds(resourceIdToTagIds); + + return ret; + } + + + static private RangerServiceResource getServiceResource(Entity entity, EntityNotification.OperationType opType) throws Exception { + + RangerServiceResource ret = null; + + Map<String, RangerPolicy.RangerPolicyResource> elements = null; + String serviceName = null; + + if (opType == EntityNotification.OperationType.ENTITY_CREATED) { + + elements = new HashMap<String, RangerPolicy.RangerPolicyResource>(); + + //String[] components = getQualifiedNameComponents(entity); + String[] components = getTempNameComponents(entity); + // components should contain qualifiedName, instanceName, dbName, tableName, columnName in that order + + + String entityTypeName = entity.getTypeName(); + + String instanceName, dbName, tableName, columnName; + + if (components.length > 1) { + instanceName = components[1]; + serviceName = getServiceName(instanceName, entityTypeName); + } + + if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_DB)) { + if (components.length > 2) { + dbName = components[2]; + RangerPolicy.RangerPolicyResource dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName); + elements.put(RANGER_TYPE_HIVE_DB, dbPolicyResource); + + } else { + LOG.error("invalid qualifiedName for HIVE_DB, qualifiedName=" + components[0]); + } + } else if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_TABLE)) { + if (components.length > 3) { + dbName = components[2]; + tableName = components[3]; + RangerPolicy.RangerPolicyResource dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName); + elements.put(RANGER_TYPE_HIVE_DB, dbPolicyResource); + RangerPolicy.RangerPolicyResource tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName); + elements.put(RANGER_TYPE_HIVE_TABLE, tablePolicyResource); + } else { + LOG.error("invalid qualifiedName for HIVE_TABLE, qualifiedName=" + components[0]); + } + } else if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_COLUMN)) { + LOG.error("HIVE_COLUMN creation is not handled."); + throw new Exception("HIVE_COLUMN entity-creation not implemented"); + + /* + if (components.length > 4) { + dbName = components[2]; + tableName = components[3]; + columnName = components[4]; + RangerPolicy.RangerPolicyResource dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName); + elements.put(RANGER_TYPE_HIVE_DB, dbPolicyResource); + RangerPolicy.RangerPolicyResource tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName); + elements.put(RANGER_TYPE_HIVE_TABLE, tablePolicyResource); + RangerPolicy.RangerPolicyResource columnPolicyResource = new RangerPolicy.RangerPolicyResource(columnName); + elements.put(RANGER_TYPE_HIVE_COLUMN, columnPolicyResource); + } else { + LOG.error("invalid qualifiedName for HIVE_COLUMN, qualifiedName=" + components[0]); + } + */ + + } + } + + ret = new RangerServiceResource(); + ret.setGuid(entity.getId().getGuid()); + ret.setId(1L); + ret.setServiceName(serviceName); + ret.setResourceElements(elements); + + return ret; + } + + static private Map<Long, RangerTag> getTags(Entity entity) { + Map<Long, RangerTag> ret = null; + + Map<String, ? extends Trait> traits = entity.getTraits(); + + if (MapUtils.isNotEmpty(traits)) { + ret = new HashMap<Long, RangerTag>(); + long index = 1; + + for (Map.Entry<String, ? extends Trait> entry : traits.entrySet()) { + String traitName = entry.getKey(); + Trait trait = entry.getValue(); + + Map<String, Object> attrValues = trait.getValues(); + + Map<String, String> tagAttrValues = new HashMap<String, String>(); + + for (Map.Entry<String, Object> attrValueEntry : attrValues.entrySet()) { + String attrName = attrValueEntry.getKey(); + Object attrValue = attrValueEntry.getValue(); + try { + String strValue = String.class.cast(attrValue); + tagAttrValues.put(attrName, strValue); + } catch (ClassCastException exception) { + LOG.error("Cannot cast attribute-value to String, skipping... attrName=" + attrName); + } + } + + RangerTag tag = new RangerTag(); + + tag.setGuid(entity.getId().getGuid() + "-" + traitName); + tag.setType(traitName); + tag.setAttributes(tagAttrValues); + + ret.put(index++, tag); + } + } + + return ret; + } + + static private Map<Long, RangerTagDef> getTagDefs(Map<Long, RangerTag> tags, EntityNotification.OperationType opType) { + + Map<Long, RangerTagDef> ret = null; + + if (opType == EntityNotification.OperationType.ENTITY_CREATED || opType == EntityNotification.OperationType.TRAIT_ADDED) { + if (MapUtils.isNotEmpty(tags)) { + ret = new HashMap<Long, RangerTagDef>(); + for (Map.Entry<Long, RangerTag> entry : tags.entrySet()) { + RangerTagDef tagDef = new RangerTagDef(); + tagDef.setName(entry.getValue().getType()); + tagDef.setId(entry.getKey()); + ret.put(entry.getKey(), tagDef); + } + } + } + + return ret; + } + + static private String[] getQualifiedNameComponents(Entity entity) { + String ret[] = new String[5]; + + if (StringUtils.equals(entity.getTypeName(), ENTITY_TYPE_HIVE_DB)) { + ret[1] = getAttribute(entity.getValues(), "clusterName", String.class); + ret[2] = getAttribute(entity.getValues(), "name", String.class); + ret[3] = null; + ret[0] = ret[1] + "." + ret[2]; + } else { + String qualifiedName = getAttribute(entity.getValues(), ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class); + + String nameHierarchy[] = qualifiedName.split(QUALIFIED_NAME_FORMAT_DELIMITER_STRING); + + int hierarchyLevels = nameHierarchy.length; + + if (LOG.isDebugEnabled()) { + LOG.debug("----- Entity-Id:" + entity.getId().getGuid()); + LOG.debug("----- Entity-Type-Name:" + entity.getTypeName()); + LOG.debug("----- Entity-Qualified-Name:" + qualifiedName); + LOG.debug("----- Entity-Qualified-Name-Components -----"); + for (int i = 0; i < hierarchyLevels; i++) { + LOG.debug("----- Index:" + i + " Value:" + nameHierarchy[i]); + } + } + + int i; + for (i = 0; i < ret.length; i++) { + ret[i] = null; + } + ret[0] = qualifiedName; + + for (i = 0; i < hierarchyLevels; i++) { + ret[i + 1] = nameHierarchy[i]; + } + } + return ret; + } + + static private String getServiceName(String instanceName, String entityTypeName) { + // Parse entityTypeName to get the Apache-component Name + String apacheComponents[] = entityTypeName.split("_"); + String apacheComponent = null; + if (apacheComponents.length > 0) { + apacheComponent = apacheComponents[0].toLowerCase(); + } + + return TagSyncConfig.getServiceName(apacheComponent, instanceName, properties); + } + + static private <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) { + return type.cast(map.get(name)); + } + + // Temporary stuff, until qualifiedName is implemented by Atlas + static private String[] getTempNameComponents(Entity entity) { + String ret[] = new String[4]; + if (StringUtils.equals(entity.getTypeName(), ENTITY_TYPE_HIVE_DB)) { + ret[1] = getAttribute(entity.getValues(), "clusterName", String.class); + ret[2] = getAttribute(entity.getValues(), "name", String.class); + ret[3] = null; + ret[0] = ret[1] + "." + ret[2]; + } else if (StringUtils.equals(entity.getTypeName(), ENTITY_TYPE_HIVE_TABLE)) { + String qualifiedName = getAttribute(entity.getValues(), "name", String.class); + String nameHierarchy[] = qualifiedName.split("\\.@"); + + int hierarchyLevels = nameHierarchy.length; + + if (LOG.isDebugEnabled()) { + LOG.debug("----- Entity-Id:" + entity.getId().getGuid()); + LOG.debug("----- Entity-Type-Name:" + entity.getTypeName()); + LOG.debug("----- Entity-Qualified-Name:" + qualifiedName); + LOG.debug("----- Entity-Qualified-Name-Components -----"); + for (int i = 0; i < hierarchyLevels; i++) { + LOG.debug("----- Index:" + i + " Value:" + nameHierarchy[i]); + } + } + + int i; + for (i = 0; i < ret.length; i++) { + ret[i] = null; + } + ret[0] = qualifiedName; + if (hierarchyLevels > 2) { + ret[1] = nameHierarchy[2]; + } + if (hierarchyLevels > 1) { + ret[2] = nameHierarchy[1]; + } + if (hierarchyLevels > 0) { + ret[3] = nameHierarchy[0]; + } + + + } + return ret; + } + + + static private ServiceTags handleEntityUpdate(Entity entity) throws Exception { + + throw new Exception("Not implemented"); + + } + + static private ServiceTags handleTraitDelete(Entity entity) throws Exception { + + throw new Exception("Not implemented"); + } +}
