Repository: incubator-ranger Updated Branches: refs/heads/master fec84603f -> 5b9c094ff
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java index 666c2c8..803a8a9 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java @@ -23,76 +23,57 @@ import org.apache.atlas.AtlasException; import org.apache.atlas.notification.entity.EntityNotification; import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.atlas.typesystem.IStruct; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.ranger.plugin.model.RangerPolicy; import org.apache.ranger.plugin.model.RangerServiceResource; import org.apache.ranger.plugin.model.RangerTag; import org.apache.ranger.plugin.model.RangerTagDef; import org.apache.ranger.plugin.util.ServiceTags; -import org.apache.ranger.tagsync.process.TagSyncConfig; import java.util.*; public class AtlasNotificationMapper { private static final Log LOG = LogFactory.getLog(AtlasNotificationMapper.class); - public static final String ENTITY_TYPE_HIVE_DB = "hive_db"; - public static final String ENTITY_TYPE_HIVE_TABLE = "hive_table"; - public static final String ENTITY_TYPE_HIVE_COLUMN = "hive_column"; - - public static final String RANGER_TYPE_HIVE_DB = "database"; - public static final String RANGER_TYPE_HIVE_TABLE = "table"; - public static final String RANGER_TYPE_HIVE_COLUMN = "column"; - - public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; - public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME_FOR_HIVE_TABLE = "name"; - public static final String QUALIFIED_NAME_FORMAT_DELIMITER_STRING = "\\."; - public static final String QUALIFIED_NAME_FORMAT_CLUSTER_DELIMITER_STRING = "@"; - - private static Properties properties = null; - - public static ServiceTags processEntityNotification(EntityNotification entityNotification, Properties props) { + public static ServiceTags processEntityNotification(EntityNotification entityNotification) { ServiceTags ret = null; - properties = props; - try { - IReferenceableInstance entity = entityNotification.getEntity(); - - if (isEntityMappable(entity)) { - ret = createServiceTags(entityNotification); - } else { - if(LOG.isDebugEnabled()) { - LOG.debug("Ranger not interested in Entity Notification for entity-type " + entityNotification.getEntity().getTypeName()); + if (isNotificationHandled(entityNotification)) { + try { + IReferenceableInstance entity = entityNotification.getEntity(); + + if (AtlasResourceMapperUtil.isEntityTypeHandled(entity.getTypeName())) { + AtlasEntityWithTraits entityWithTraits = new AtlasEntityWithTraits(entityNotification.getEntity(), entityNotification.getAllTraits()); + ret = buildServiceTags(entityWithTraits, 1L, 1L, null); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Ranger not interested in Entity Notification for entity-type " + entityNotification.getEntity().getTypeName()); + } } + } catch (Exception exception) { + LOG.error("createServiceTags() failed!! ", exception); } - } catch (Exception exception) { - LOG.error("createServiceTags() failed!! ", exception); } return ret; } - static private boolean isEntityMappable(IReferenceableInstance entity) { - boolean ret = false; - - String entityTypeName = entity.getTypeName(); + public static Map<String, ServiceTags> processEntitiesWithTraits(List<AtlasEntityWithTraits> atlasEntities) { + Map<String, ServiceTags> ret = null; - if (StringUtils.isNotBlank(entityTypeName)) { - if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_DB) || - StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_TABLE) || - StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_COLUMN)) { - ret = true; - } + try { + ret = buildServiceTags(atlasEntities); + } catch (Exception exception) { + LOG.error("Failed to build serviceTags", exception); } + return ret; } - static private ServiceTags createServiceTags(EntityNotification entityNotification) throws Exception { - - ServiceTags ret = null; + static private boolean isNotificationHandled(EntityNotification entityNotification) { + boolean ret = false; EntityNotification.OperationType opType = entityNotification.getOperationType(); @@ -101,17 +82,10 @@ public class AtlasNotificationMapper { LOG.debug("ENTITY_CREATE notification is not handled, as Ranger will get necessary information from any subsequent TRAIT_ADDED notification"); break; } - case ENTITY_UPDATE: { - ret = getServiceTags(entityNotification); - if (MapUtils.isEmpty(ret.getTags())) { - LOG.debug("No traits associated with this entity update notification. Ignoring it altogether"); - ret = null; - } - break; - } + case ENTITY_UPDATE: case TRAIT_ADD: case TRAIT_DELETE: { - ret = getServiceTags(entityNotification); + ret = true; break; } default: @@ -121,130 +95,116 @@ public class AtlasNotificationMapper { return ret; } - static private ServiceTags getServiceTags(EntityNotification entityNotification) throws Exception { - ServiceTags ret = null; + static private Map<String, ServiceTags> buildServiceTags(List<AtlasEntityWithTraits> entitiesWithTraits) throws Exception { - IReferenceableInstance entity = entityNotification.getEntity(); + Map<String, ServiceTags> ret = new HashMap<String, ServiceTags>(); - List<RangerServiceResource> serviceResources = new ArrayList<RangerServiceResource>(); + long serviceResourceIndex = 1L; + long tagIndex = 1L; - RangerServiceResource serviceResource = getServiceResource(entity); - serviceResources.add(serviceResource); + for (AtlasEntityWithTraits element : entitiesWithTraits) { - Map<Long, RangerTag> tags = getTags(entityNotification); + ServiceTags serviceTags = buildServiceTags(element, serviceResourceIndex, tagIndex, ret); - Map<Long, RangerTagDef> tagDefs = getTagDefs(tags); + serviceResourceIndex++; - Map<Long, List<Long>> resourceIdToTagIds = null; + tagIndex += CollectionUtils.size(serviceTags.getTags()); - resourceIdToTagIds = new HashMap<Long, List<Long>>(); - List<Long> tagList = new ArrayList<Long>(); + } + // Remove duplicate tag definitions + for (Map.Entry<String, ServiceTags> serviceTagsMapEntry : ret.entrySet()){ - if (MapUtils.isNotEmpty(tags)) { - resourceIdToTagIds = new HashMap<Long, List<Long>>(); + Map<Long, RangerTagDef> allTagDefs = serviceTagsMapEntry.getValue().getTagDefinitions(); - for (Map.Entry<Long, RangerTag> entry : tags.entrySet()) { - tagList.add(entry.getKey()); + Map<String, String> tagTypeIndex = new HashMap<String, String>(); + Map<Long, RangerTagDef> uniqueTagDefs = new HashMap<Long, RangerTagDef>(); + + for (Map.Entry<Long, RangerTagDef> entry : allTagDefs.entrySet()) { + String tagTypeName = entry.getValue().getName(); + + if (tagTypeIndex.get(tagTypeName) == null) { + tagTypeIndex.put(tagTypeName, tagTypeName); + uniqueTagDefs.put(entry.getKey(), entry.getValue()); + } } + serviceTagsMapEntry.getValue().setTagDefinitions(uniqueTagDefs); } - resourceIdToTagIds.put(1L, tagList); + return ret; + } + + static private ServiceTags buildServiceTags(AtlasEntityWithTraits entityWithTraits, long index, long tagIndex, Map<String, ServiceTags> serviceTagsMap) throws Exception { + ServiceTags ret = null; - ret = new ServiceTags(); + IReferenceableInstance entity = entityWithTraits.getEntity(); - ret.setOp(ServiceTags.OP_ADD_OR_UPDATE); - ret.setTagModel(ServiceTags.TAGMODEL_RESOURCE_PRIVATE); - ret.setServiceName(serviceResource.getServiceName()); - ret.setServiceResources(serviceResources); - ret.setTagDefinitions(tagDefs); - ret.setTags(tags); - ret.setResourceToTagIds(resourceIdToTagIds); + RangerServiceResource serviceResource = AtlasResourceMapperUtil.getRangerServiceResource(entity); - return ret; - } + if (serviceResource != null) { + serviceResource.setId(index); - static private RangerServiceResource getServiceResource(IReferenceableInstance entity) throws Exception { + String serviceName = serviceResource.getServiceName(); - RangerServiceResource ret = null; + Map<Long, RangerTag> tags = getTags(entityWithTraits, tagIndex); - Map<String, RangerPolicy.RangerPolicyResource> elements = null; - String serviceName = null; + Map<Long, RangerTagDef> tagDefs = getTagDefs(tags); + Map<Long, List<Long>> resourceIdToTagIds = null; - elements = new HashMap<String, RangerPolicy.RangerPolicyResource>(); + resourceIdToTagIds = new HashMap<Long, List<Long>>(); + List<Long> tagList = new ArrayList<Long>(); + + if (MapUtils.isNotEmpty(tags)) { + resourceIdToTagIds = new HashMap<Long, List<Long>>(); - List<String> components = getQualifiedNameComponents(entity); - // components should contain qualifiedName, clusterName, dbName, tableName, columnName in that order + for (Map.Entry<Long, RangerTag> entry : tags.entrySet()) { + tagList.add(entry.getKey()); + } + } - String entityTypeName = entity.getTypeName(); + resourceIdToTagIds.put(index, tagList); - String qualifiedName = components.get(0); + ret = createOrGetServiceTags(serviceTagsMap, serviceName); - String clusterName, dbName, tableName, columnName; + ret.getServiceResources().add(serviceResource); + ret.getTagDefinitions().putAll(tagDefs); + ret.getTags().putAll(tags); + ret.getResourceToTagIds().putAll(resourceIdToTagIds); - if (components.size() > 1) { - clusterName = components.get(1); - serviceName = getServiceName(clusterName, entityTypeName); + } else { + LOG.error("AtlasResourceMapper not found for entity-type:" + entity.getTypeName()); } - if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_DB)) { - if (components.size() > 2) { - dbName = components.get(2); - RangerPolicy.RangerPolicyResource dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName); - elements.put(RANGER_TYPE_HIVE_DB, dbPolicyResource); + return ret; + } + + static private ServiceTags createOrGetServiceTags(Map<String, ServiceTags> serviceTagsMap, String serviceName) { + ServiceTags ret = serviceTagsMap == null ? null : serviceTagsMap.get(serviceName); - } else { - LOG.error("invalid qualifiedName for HIVE_DB, qualifiedName=" + qualifiedName); - } - } else if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_TABLE)) { - if (components.size() > 3) { - dbName = components.get(2); - tableName = components.get(3); - RangerPolicy.RangerPolicyResource dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName); - elements.put(RANGER_TYPE_HIVE_DB, dbPolicyResource); - RangerPolicy.RangerPolicyResource tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName); - elements.put(RANGER_TYPE_HIVE_TABLE, tablePolicyResource); - } else { - LOG.error("invalid qualifiedName for HIVE_TABLE, qualifiedName=" + qualifiedName); - } - } else if (StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_COLUMN)) { - if (components.size() > 4) { - dbName = components.get(2); - tableName = components.get(3); - columnName = components.get(4); - RangerPolicy.RangerPolicyResource dbPolicyResource = new RangerPolicy.RangerPolicyResource(dbName); - elements.put(RANGER_TYPE_HIVE_DB, dbPolicyResource); - RangerPolicy.RangerPolicyResource tablePolicyResource = new RangerPolicy.RangerPolicyResource(tableName); - elements.put(RANGER_TYPE_HIVE_TABLE, tablePolicyResource); - RangerPolicy.RangerPolicyResource columnPolicyResource = new RangerPolicy.RangerPolicyResource(columnName); - elements.put(RANGER_TYPE_HIVE_COLUMN, columnPolicyResource); - } else { - LOG.error("invalid qualifiedName for HIVE_COLUMN, qualifiedName=" + qualifiedName); + if (ret == null) { + ret = new ServiceTags(); + + if (serviceTagsMap != null) { + serviceTagsMap.put(serviceName, ret); } + ret.setOp(ServiceTags.OP_ADD_OR_UPDATE); + ret.setTagModel(ServiceTags.TAGMODEL_RESOURCE_PRIVATE); + ret.setServiceName(serviceName); } - - ret = new RangerServiceResource(); - ret.setGuid(entity.getId()._getId()); - ret.setId(1L); - ret.setServiceName(serviceName); - ret.setResourceElements(elements); - return ret; } - static private Map<Long, RangerTag> getTags(EntityNotification entityNotification) { + static private Map<Long, RangerTag> getTags(AtlasEntityWithTraits entityWithTraits, long index) { Map<Long, RangerTag> ret = null; ret = new HashMap<Long, RangerTag>(); - long index = 1; - - List<IStruct> traits = entityNotification.getAllTraits(); + List<IStruct> traits = entityWithTraits.getAllTraits(); for (IStruct trait : traits) { @@ -288,113 +248,18 @@ public class AtlasNotificationMapper { if (MapUtils.isNotEmpty(tags)) { ret = new HashMap<Long, RangerTagDef>(); + for (Map.Entry<Long, RangerTag> entry : tags.entrySet()) { + RangerTagDef tagDef = new RangerTagDef(); tagDef.setName(entry.getValue().getType()); tagDef.setId(entry.getKey()); ret.put(entry.getKey(), tagDef); - } - } - - return ret; - } - - static private String getQualifiedNameAttributeName(String entityTypeName) { - String ret = StringUtils.equals(entityTypeName, ENTITY_TYPE_HIVE_TABLE) ? - ENTITY_ATTRIBUTE_QUALIFIED_NAME_FOR_HIVE_TABLE : ENTITY_ATTRIBUTE_QUALIFIED_NAME; - - return ret; - } - - static private List<String> getQualifiedNameComponents(IReferenceableInstance entity) throws Exception { - List<String> ret = null; - - String qualifiedNameAttributeName = getQualifiedNameAttributeName(entity.getTypeName()); - - String qualifiedName = getEntityAttribute(entity, qualifiedNameAttributeName, String.class); - - ret = getQualifiedNameComponents(entity.getTypeName(), qualifiedName); - - if (LOG.isDebugEnabled()) { - LOG.debug("----- Entity-Id:" + entity.getId()._getId()); - LOG.debug("----- Entity-Type-Name:" + entity.getTypeName()); - LOG.debug("----- Entity-Components -----"); - int i = 0; - for (String value : ret) { - LOG.debug("----- Index:" + i++ + " Value:" + value); } } - return ret; - } - - static public List<String> getQualifiedNameComponents(String entityTypeName, String qualifiedName) throws Exception { - - List<String> ret = null; - - String qualifiedNameAttributeName = getQualifiedNameAttributeName(entityTypeName); - - if (StringUtils.isBlank(qualifiedName)) { - throw new Exception("Could not get a valid value for " + qualifiedNameAttributeName + " attribute from entity notification."); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Received .... " + qualifiedNameAttributeName + "=" + qualifiedName + " for entity type " + entityTypeName); - } - - String components[] = qualifiedName.split(QUALIFIED_NAME_FORMAT_CLUSTER_DELIMITER_STRING); - - if (components == null || components.length != 2) { - throw new Exception("Qualified Name does not contain cluster-name, qualifiedName=" + qualifiedName); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("name-hierarchy=" + components[0] + ", cluster-name=" + components[1]); - } - - String nameHierarchy[] = components[0].split(QUALIFIED_NAME_FORMAT_DELIMITER_STRING); - - int hierarchyLevels = nameHierarchy.length; - - ret = new ArrayList<String>(); - - ret.add(qualifiedName); - ret.add(components[1]); - - for (int i = 0; i < hierarchyLevels; i++) { - ret.add(nameHierarchy[i]); - } return ret; } - static private String getServiceName(String clusterName, String entityTypeName) { - // Parse entityTypeName to get the Apache-component Name - // Assumption: entityTypeName is <componentName>_<component_specific_type_name> - // such as hive_table, hadoop_path, hbase_queue, etc. - String apacheComponents[] = entityTypeName.split("_"); - String apacheComponent = null; - if (apacheComponents.length > 0) { - apacheComponent = apacheComponents[0].toLowerCase(); - } - - return TagSyncConfig.getServiceName(apacheComponent, clusterName, properties); - } - - static private <T> T getEntityAttribute(IReferenceableInstance entity, String name, Class<T> type) { - T ret = null; - - try { - Map<String, Object> valueMap = entity.getValuesMap(); - ret = getAttribute(valueMap, name, type); - } catch (AtlasException exception) { - LOG.error("Cannot get map of values for entity: " + entity.getId()._getId(), exception); - } - - return ret; - } - static private <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) { - return type.cast(map.get(name)); - } - } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java deleted file mode 100644 index 42ba7c7..0000000 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ranger.tagsync.source.atlas; - -import com.google.gson.Gson; - -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.Provider; - -import org.apache.atlas.AtlasException; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.atlas.notification.NotificationConsumer; -import org.apache.atlas.notification.NotificationInterface; -import org.apache.atlas.notification.NotificationModule; -import org.apache.atlas.notification.entity.EntityNotification; -import org.apache.atlas.typesystem.IReferenceableInstance; -import org.apache.atlas.typesystem.IStruct; - -import org.apache.ranger.tagsync.model.TagSink; -import org.apache.ranger.tagsync.model.TagSource; -import org.apache.ranger.plugin.util.ServiceTags; - -import java.io.IOException; -import java.io.InputStream; -import java.util.*; - -public class TagAtlasSource implements TagSource { - private static final Log LOG = LogFactory.getLog(TagAtlasSource.class); - - public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = "application.properties"; - - public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = "atlas.kafka.bootstrap.servers"; - public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = "atlas.kafka.zookeeper.connect"; - public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.kafka.entities.group.id"; - - private TagSink tagSink; - private Properties properties; - private ConsumerRunnable consumerTask; - - @Override - public boolean initialize(Properties properties) { - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagAtlasSource.initialize()"); - } - - boolean ret = true; - - if (properties == null || MapUtils.isEmpty(properties)) { - LOG.error("No properties specified for TagFileSource initialization"); - this.properties = new Properties(); - } else { - this.properties = properties; - } - - Properties atlasProperties = new Properties(); - - InputStream inputStream = getClass().getClassLoader().getResourceAsStream(TAGSYNC_ATLAS_PROPERTIES_FILE_NAME); - - if (inputStream != null) { - try { - atlasProperties.load(inputStream); - } catch (Exception exception) { - ret = false; - LOG.error("Cannot load Atlas application properties file, file-name:" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME, exception); - } finally { - try { - inputStream.close(); - } catch (IOException ioException) { - LOG.error("Cannot close Atlas application properties file, file-name:\" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME", ioException); - } - } - } else { - ret = false; - LOG.error("Cannot find Atlas application properties file"); - } - - if (ret) { - if (StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_KAFKA_ENDPOINTS))) { - ret = false; - LOG.error("Value of property '" + TAGSYNC_ATLAS_KAFKA_ENDPOINTS + "' is not specified!"); - } - if (StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT))) { - ret = false; - LOG.error("Value of property '" + TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT + "' is not specified!"); - } - if (StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_CONSUMER_GROUP))) { - ret = false; - LOG.error("Value of property '" + TAGSYNC_ATLAS_CONSUMER_GROUP + "' is not specified!"); - } - } - - if (ret) { - NotificationModule notificationModule = new NotificationModule(); - - Injector injector = Guice.createInjector(notificationModule); - - Provider<NotificationInterface> consumerProvider = injector.getProvider(NotificationInterface.class); - NotificationInterface notification = consumerProvider.get(); - List<NotificationConsumer<EntityNotification>> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1); - - consumerTask = new ConsumerRunnable(iterators.get(0)); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== TagAtlasSource.initialize(), result=" + ret); - } - return ret; - } - - @Override - public void setTagSink(TagSink sink) { - if (sink == null) { - LOG.error("Sink is null!!!"); - } else { - this.tagSink = sink; - } - } - - @Override - public Thread start() { - Thread consumerThread = null; - if (consumerTask == null) { - LOG.error("No consumerTask!!!"); - } else { - consumerThread = new Thread(consumerTask); - consumerThread.setDaemon(true); - consumerThread.start(); - } - return consumerThread; - } - - @Override - public void updateSink() throws Exception { - } - - @Override - public boolean isChanged() { - return true; - } - - // ----- inner class : ConsumerRunnable ------------------------------------ - - private class ConsumerRunnable implements Runnable { - - private final NotificationConsumer<EntityNotification> consumerIterator; - - private ConsumerRunnable(NotificationConsumer<EntityNotification> consumerIterator) { - this.consumerIterator = consumerIterator; - } - - // ----- Runnable -------------------------------------------------------- - - @Override - public void run() { - while (consumerIterator.hasNext()) { - try { - EntityNotification notification = consumerIterator.next(); - if (notification != null) { - printNotification(notification); - ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification, properties); - if (serviceTags == null) { - if(LOG.isDebugEnabled()) { - LOG.debug("Did not create ServiceTags structure for notification type:" + notification.getOperationType()); - } - } else { - if (LOG.isDebugEnabled()) { - String serviceTagsJSON = new Gson().toJson(serviceTags); - LOG.debug("Atlas notification mapped to serviceTags=" + serviceTagsJSON); - } - - try { - tagSink.uploadServiceTags(serviceTags); - } catch (Exception exception) { - LOG.error("uploadServiceTags() failed..", exception); - } - } - } - } catch(Exception e){ - LOG.error("Exception encountered when processing notification:", e); - } - } - } - - public void printNotification(EntityNotification notification) { - IReferenceableInstance entity = notification.getEntity(); - if (LOG.isDebugEnabled()) { - try { - LOG.debug("Notification-Type: " + notification.getOperationType()); - LOG.debug("Entity-Id: " + entity.getId()._getId()); - LOG.debug("Entity-Type: " + entity.getTypeName()); - - LOG.debug("----------- Entity Values ----------"); - - - for (Map.Entry<String, Object> entry : entity.getValuesMap().entrySet()) { - LOG.debug(" Name:" + entry.getKey()); - Object value = entry.getValue(); - LOG.debug(" Value:" + value); - } - - LOG.debug("----------- Entity Traits ----------"); - - List<IStruct> traits = notification.getAllTraits(); - - for (IStruct trait : traits) { - LOG.debug(" Trait-Type-Name:" + trait.getTypeName()); - Map<String, Object> traitValues = trait.getValuesMap(); - for (Map.Entry<String, Object> valueEntry : traitValues.entrySet()) { - LOG.debug(" Trait-Value-Name:" + valueEntry.getKey()); - LOG.debug(" Trait-Value:" + valueEntry.getValue()); - } - } - } catch (AtlasException exception) { - LOG.error("Cannot print notification - ", exception); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java new file mode 100644 index 0000000..c8ed948 --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.source.atlasrest; + +import com.google.gson.Gson; + +import com.google.gson.GsonBuilder; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.ranger.tagsync.model.AbstractTagSource; +import org.apache.ranger.plugin.util.ServiceTags; +import org.apache.ranger.tagsync.process.TagSyncConfig; +import org.apache.ranger.tagsync.source.atlas.AtlasEntityWithTraits; +import org.apache.ranger.tagsync.source.atlas.AtlasNotificationMapper; +import org.apache.ranger.tagsync.source.atlas.AtlasResourceMapperUtil; + +import java.util.*; + +public class AtlasRESTTagSource extends AbstractTagSource implements Runnable { + private static final Log LOG = LogFactory.getLog(AtlasRESTTagSource.class); + + private String atlasEndpoint; + private long sleepTimeBetweenCycleInMillis; + + @Override + public boolean initialize(Properties properties) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasRESTTagSource.initialize()"); + } + + boolean ret = AtlasResourceMapperUtil.initializeAtlasResourceMappers(properties); + + atlasEndpoint = TagSyncConfig.getAtlasEndpoint(properties); + sleepTimeBetweenCycleInMillis = TagSyncConfig.getTagSourceAtlasDownloadIntervalInMillis(properties); + + if (StringUtils.isEmpty(atlasEndpoint)) { + LOG.info("No AtlasEndpoint specified, Initial download of Atlas-entities cannot be done."); + ret = false; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasRESTTagSource.initialize(), result=" + ret); + } + + return ret; + } + + @Override + public boolean start() { + + Thread atlasRESTInvokerThread = new Thread(this); + atlasRESTInvokerThread.setDaemon(true); + atlasRESTInvokerThread.start(); + + return atlasRESTInvokerThread != null; + } + + @Override + public void run() { + + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasRESTTagSource.run()"); + } + + while (!shutdown) { + + synchUp(); + + LOG.debug("Sleeping for [" + sleepTimeBetweenCycleInMillis + "] milliSeconds"); + + try { + + Thread.sleep(sleepTimeBetweenCycleInMillis); + + } catch (InterruptedException exception) { + LOG.error("Failed to wait for [" + sleepTimeBetweenCycleInMillis + "] milliseconds before checking for update to tagFileSource", exception); + } + } + LOG.info("Shutting down the Tag-Atlasrest-source thread"); + } + + @Override + public boolean isChanged() { + return true; + } + + @Override + public void synchUp() { + + AtlasRESTUtil atlasRESTUtil = new AtlasRESTUtil(atlasEndpoint); + + List<AtlasEntityWithTraits> atlasEntitiesWithTraits = atlasRESTUtil.getEntitiesWithTraits(); + + if (CollectionUtils.isNotEmpty(atlasEntitiesWithTraits)) { + if (LOG.isDebugEnabled()) { + for (AtlasEntityWithTraits element : atlasEntitiesWithTraits) { + LOG.debug(element); + } + } + + Map<String, ServiceTags> serviceTagsMap = AtlasNotificationMapper.processEntitiesWithTraits(atlasEntitiesWithTraits); + + if (MapUtils.isNotEmpty(serviceTagsMap)) { + for (Map.Entry<String, ServiceTags> entry : serviceTagsMap.entrySet()) { + if (LOG.isDebugEnabled()) { + Gson gsonBuilder = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z") + .setPrettyPrinting() + .create(); + String serviceTagsString = gsonBuilder.toJson(entry.getValue()); + + LOG.debug("serviceTags=" + serviceTagsString); + } + updateSink(entry.getValue()); + } + } + } + + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java new file mode 100644 index 0000000..7f4676a --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ranger.tagsync.source.atlasrest; + +import com.google.gson.Gson; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import org.apache.atlas.typesystem.IReferenceableInstance; +import org.apache.atlas.typesystem.IStruct; +import org.apache.atlas.typesystem.Struct; +import org.apache.atlas.typesystem.json.InstanceSerialization; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.log4j.Logger; +import org.apache.ranger.admin.client.datatype.RESTResponse; +import org.apache.ranger.plugin.util.RangerRESTClient; +import org.apache.ranger.tagsync.source.atlas.AtlasEntityWithTraits; +import org.apache.ranger.tagsync.source.atlas.AtlasResourceMapperUtil; + +import java.util.*; + +@SuppressWarnings("unchecked") +public class AtlasRESTUtil { + private static final Logger LOG = Logger.getLogger(AtlasRESTUtil.class); + + private static final String REST_MIME_TYPE_JSON = "application/json" ; + private static final String API_ATLAS_TYPES = "api/atlas/types"; + private static final String API_ATLAS_ENTITIES = "api/atlas/entities?type="; + private static final String API_ATLAS_ENTITY = "api/atlas/entities/"; + private static final String API_ATLAS_TYPE = "api/atlas/types/"; + + private static final String RESULTS_ATTRIBUTE = "results"; + private static final String DEFINITION_ATTRIBUTE = "definition"; + private static final String VALUES_ATTRIBUTE = "values"; + private static final String TRAITS_ATTRIBUTE = "traits"; + private static final String TYPE_NAME_ATTRIBUTE = "typeName"; + private static final String TRAIT_TYPES_ATTRIBUTE = "traitTypes"; + private static final String SUPER_TYPES_ATTRIBUTE = "superTypes"; + private static final String ATTRIBUTE_DEFINITIONS_ATTRIBUTE = "attributeDefinitions"; + private static final String NAME_ATTRIBUTE = "name"; + + private final Gson gson = new Gson(); + + private RangerRESTClient atlasRESTClient; + + public AtlasRESTUtil(String atlasEndpoint) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasRESTUtil(" + atlasEndpoint + ")"); + } + + if (!atlasEndpoint.endsWith("/")) { + atlasEndpoint += "/"; + } + + // This uses RangerRESTClient to invoke REST APIs on Atlas. It will work only if scheme of URL is http + atlasRESTClient = new RangerRESTClient(); + atlasRESTClient.setUrl(atlasEndpoint); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasRESTUtil(" + atlasEndpoint + ")"); + } + } + + public List<AtlasEntityWithTraits> getEntitiesWithTraits() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> getEntriesWithTraits()"); + } + + List<AtlasEntityWithTraits> ret = new ArrayList<AtlasEntityWithTraits>(); + + Map<String, Object> typesResponse = atlasAPI(API_ATLAS_TYPES); + + List<String> types = getAttribute(typesResponse, RESULTS_ATTRIBUTE, List.class); + + if (CollectionUtils.isNotEmpty(types)) { + + for (String type : types) { + + if (!AtlasResourceMapperUtil.isEntityTypeHandled(type)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not fetching Atlas entities of type:" + type); + } + continue; + } + + Map<String, Object> entitiesResponse = atlasAPI(API_ATLAS_ENTITIES + type); + + List<String> guids = getAttribute(entitiesResponse, RESULTS_ATTRIBUTE, List.class); + + if (CollectionUtils.isEmpty(guids)) { + if (LOG.isDebugEnabled()) { + LOG.debug("No Atlas entities for type:" + type); + } + continue; + } + + for (String guid : guids) { + + Map<String, Object> entityResponse = atlasAPI(API_ATLAS_ENTITY + guid); + + if (MapUtils.isNotEmpty(entityResponse) && entityResponse.containsKey(DEFINITION_ATTRIBUTE)) { + + Map<String, Object> definition = getAttribute(entityResponse, DEFINITION_ATTRIBUTE, Map.class); + Map<String, Object> traitsAttribute = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class); + + if (MapUtils.isNotEmpty(traitsAttribute)) { + + List<IStruct> allTraits = new LinkedList<>(); + + for (Map.Entry<String, Object> entry : traitsAttribute.entrySet()) { + + Map<String, Object> trait = (Map<String, Object>) entry.getValue(); + + Map<String, Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class); + String traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class); + + List<IStruct> superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues); + + Struct trait1 = new Struct(traitTypeName, traitValues); + + allTraits.add(trait1); + allTraits.addAll(superTypes); + } + + IReferenceableInstance entity = InstanceSerialization.fromJsonReferenceable(gson.toJson(definition), true); + + if (entity != null) { + AtlasEntityWithTraits entityWithTraits = new AtlasEntityWithTraits(entity, allTraits); + ret.add(entityWithTraits); + } else { + if (LOG.isInfoEnabled()) { + LOG.info("Could not create Atlas entity from its definition, type=" + type + ", guid=" + guid); + } + } + + } + } + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("<== getEntriesWithTraits()"); + } + } + + return ret; + } + + private Map<String, Object> getTraitType(String traitName) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> getTraitType(" + traitName + ")"); + } + Map<String, Object> ret = null; + + Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + traitName); + + if (typeResponse.containsKey(DEFINITION_ATTRIBUTE)) { + + Map<String, Object> definition = getAttribute(typeResponse, DEFINITION_ATTRIBUTE, Map.class); + + List traitTypes = getAttribute(definition, TRAIT_TYPES_ATTRIBUTE, List.class); + + if (traitTypes.size() > 0) { + ret = (Map<String, Object>) traitTypes.get(0); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== getTraitType(" + traitName + ")"); + } + return ret; + } + + private List<IStruct> getTraitSuperTypes(Map<String, Object> traitType, Map<String, Object> values) { + + if (LOG.isDebugEnabled()) { + LOG.debug("==> getTraitSuperTypes()"); + } + List<IStruct> ret = new LinkedList<>(); + + if (traitType != null) { + + List<String> superTypeNames = getAttribute(traitType, SUPER_TYPES_ATTRIBUTE, List.class); + + for (String superTypeName : superTypeNames) { + + Map<String, Object> superTraitType = getTraitType(superTypeName); + + if (superTraitType != null) { + List<Map<String, Object>> attributeDefinitions = (List) superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE); + + Map<String, Object> superTypeValues = new HashMap<>(); + for (Map<String, Object> attributeDefinition : attributeDefinitions) { + + String attributeName = attributeDefinition.get(NAME_ATTRIBUTE).toString(); + if (values.containsKey(attributeName)) { + superTypeValues.put(attributeName, values.get(attributeName)); + } + } + + List<IStruct> superTraits = getTraitSuperTypes(getTraitType(superTypeName), values); + + Struct superTrait = new Struct(superTypeName, superTypeValues); + + ret.add(superTrait); + ret.addAll(superTraits); + } + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("<== getTraitSuperTypes()"); + } + return ret; + } + + private Map<String, Object> atlasAPI(String endpoint) { + + if (LOG.isDebugEnabled()) { + LOG.debug("==> atlasAPI(" + endpoint + ")"); + } + Map<String, Object> ret = new HashMap<String, Object>(); + + try { + WebResource webResource = atlasRESTClient.getResource(endpoint); + ClientResponse response = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).get(ClientResponse.class); + + if (response != null && response.getStatus() == 200) { + ret = response.getEntity(ret.getClass()); + } else { + RESTResponse resp = RESTResponse.fromClientResponse(response); + LOG.error("Error getting atlas data request=" + webResource.toString() + + ", response=" + resp.toString()); + } + } catch (Exception exception) { + LOG.error("Exception when fetching Atlas objects.", exception); + ret = null; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== atlasAPI(" + endpoint + ")"); + } + return ret; + } + + private <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) { + return MapUtils.isNotEmpty(map) ? type.cast(map.get(name)) : null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java new file mode 100644 index 0000000..43eb3b5 --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.source.file; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.tagsync.model.AbstractTagSource; +import org.apache.ranger.plugin.util.ServiceTags; +import org.apache.ranger.tagsync.process.TagSyncConfig; + +import java.io.*; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.Date; +import java.util.Properties; + +public class FileTagSource extends AbstractTagSource implements Runnable { + private static final Log LOG = LogFactory.getLog(FileTagSource.class); + + private String serviceTagsFileName; + private URL serviceTagsFileURL; + private long lastModifiedTimeInMillis = -1L; + + private Gson gsonBuilder; + private Properties properties; + private long fileModTimeCheckIntervalInMs; + + @Override + public boolean initialize(Properties props) { + + if (LOG.isDebugEnabled()) { + LOG.debug("==> FileTagSource.initialize()"); + } + + if (props == null || MapUtils.isEmpty(props)) { + LOG.error("No properties specified for FileTagSource initialization"); + this.properties = new Properties(); + } else { + this.properties = props; + } + + gsonBuilder = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create(); + + boolean ret = true; + + serviceTagsFileName = TagSyncConfig.getTagSourceFileName(properties); + + if (StringUtils.isBlank(serviceTagsFileName)) { + ret = false; + LOG.error("value of property 'ranger.tagsync.source.impl.class' is file and no value specified for property 'ranger.tagsync.filesource.filename'!"); + } + + if (ret) { + + fileModTimeCheckIntervalInMs = TagSyncConfig.getTagSourceFileModTimeCheckIntervalInMillis(properties); + + if (LOG.isDebugEnabled()) { + LOG.debug("Provided serviceTagsFileName=" + serviceTagsFileName); + LOG.debug("'ranger.tagsync.filesource.modtime.check.interval':" + fileModTimeCheckIntervalInMs + "ms"); + } + + InputStream serviceTagsFileStream = null; + + File f = new File(serviceTagsFileName); + + if (f.exists() && f.isFile() && f.canRead()) { + try { + serviceTagsFileStream = new FileInputStream(f); + serviceTagsFileURL = f.toURI().toURL(); + } catch (FileNotFoundException exception) { + LOG.error("Error processing input file:" + serviceTagsFileName + " or no privilege for reading file " + serviceTagsFileName, exception); + } catch (MalformedURLException malformedException) { + LOG.error("Error processing input file:" + serviceTagsFileName + " cannot be converted to URL " + serviceTagsFileName, malformedException); + } + } else { + + URL fileURL = getClass().getResource(serviceTagsFileName); + if (fileURL == null) { + if (!serviceTagsFileName.startsWith("/")) { + fileURL = getClass().getResource("/" + serviceTagsFileName); + } + } + + if (fileURL == null) { + fileURL = ClassLoader.getSystemClassLoader().getResource(serviceTagsFileName); + if (fileURL == null) { + if (!serviceTagsFileName.startsWith("/")) { + fileURL = ClassLoader.getSystemClassLoader().getResource("/" + serviceTagsFileName); + } + } + } + + if (fileURL != null) { + + try { + serviceTagsFileStream = fileURL.openStream(); + serviceTagsFileURL = fileURL; + } catch (Exception exception) { + LOG.error(serviceTagsFileName + " is not a file", exception); + } + } else { + LOG.warn("Error processing input file: URL not found for " + serviceTagsFileName + " or no privilege for reading file " + serviceTagsFileName); + } + } + + if (serviceTagsFileStream != null) { + try { + serviceTagsFileStream.close(); + } catch (Exception e) { + // Ignore + } + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== FileTagSource.initialize(): sourceFileName=" + serviceTagsFileName + ", result=" + ret); + } + + return ret; + } + + @Override + public boolean start() { + + Thread fileMonitoringThread = new Thread(this); + fileMonitoringThread.setDaemon(true); + fileMonitoringThread.start(); + + return fileMonitoringThread != null; + } + + @Override + public void run() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> FileTagSource.run()"); + } + + while (!shutdown) { + + try { + synchUp(); + + LOG.debug("Sleeping for [" + fileModTimeCheckIntervalInMs + "] milliSeconds"); + + Thread.sleep(fileModTimeCheckIntervalInMs); + } + catch (InterruptedException e) { + LOG.error("Failed to wait for [" + fileModTimeCheckIntervalInMs + "] milliseconds before checking for update to tagFileSource", e); + } + catch (Throwable t) { + LOG.error("tag-sync thread got an error", t); + } + } + + LOG.info("Shutting down the Tag-file-source thread"); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== FileTagSource.run()"); + } + } + + @Override + public boolean isChanged() { + + if (LOG.isDebugEnabled()) { + LOG.debug("==> FileTagSource.isChanged()"); + } + boolean ret = false; + + long modificationTime = getModificationTime(); + + if (modificationTime > lastModifiedTimeInMillis) { + if (LOG.isDebugEnabled()) { + Date modifiedDate = new Date(modificationTime); + Date lastModifiedDate = new Date(lastModifiedTimeInMillis); + LOG.debug("File modified at " + modifiedDate + "last-modified at " + lastModifiedDate); + } + lastModifiedTimeInMillis = modificationTime; + ret = true; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== FileTagSource.isChanged(): result=" + ret); + } + return ret; + } + + @Override + public void synchUp() { + if (isChanged()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Begin: update tags from source==>sink"); + } + + ServiceTags serviceTags = readFromFile(); + updateSink(serviceTags); + + if (LOG.isDebugEnabled()) { + LOG.debug("End: update tags from source==>sink"); + } + + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("FileTagSource: no change found for synchronization."); + } + } + } + private ServiceTags readFromFile() { + + if (LOG.isDebugEnabled()) { + LOG.debug("==> FileTagSource.readFromFile(): sourceFileName=" + serviceTagsFileName); + } + + ServiceTags ret = null; + + if (serviceTagsFileURL != null) { + try ( + InputStream serviceTagsFileStream = serviceTagsFileURL.openStream(); + Reader reader = new InputStreamReader(serviceTagsFileStream, Charset.forName("UTF-8")) + ) { + + ret = gsonBuilder.fromJson(reader, ServiceTags.class); + + } catch (IOException e) { + LOG.warn("Error processing input file: or no privilege for reading file " + serviceTagsFileName, e); + } + } else { + LOG.error("Error reading file: " + serviceTagsFileName); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== FileTagSource.readFromFile(): sourceFileName=" + serviceTagsFileName); + } + + return ret; + } + + private long getModificationTime() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> FileTagSource.getLastModificationTime(): sourceFileName=" + serviceTagsFileName); + } + long ret = 0L; + + File sourceFile = new File(serviceTagsFileName); + + if (sourceFile.exists() && sourceFile.isFile() && sourceFile.canRead()) { + ret = sourceFile.lastModified(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== FileTagSource.lastModificationTime(): serviceTagsFileName=" + serviceTagsFileName + " result=" + new Date(ret)); + } + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java deleted file mode 100644 index 92f24b2..0000000 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java +++ /dev/null @@ -1,281 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ranger.tagsync.source.file; - -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.ranger.tagsync.model.TagSink; -import org.apache.ranger.tagsync.model.TagSource; -import org.apache.ranger.plugin.util.ServiceTags; -import org.apache.ranger.tagsync.process.TagSyncConfig; - -import java.io.*; -import java.util.Date; -import java.util.Properties; - -public class TagFileSource implements TagSource, Runnable { - private static final Log LOG = LogFactory.getLog(TagFileSource.class); - - private String sourceFileName; - private long lastModifiedTimeInMillis = 0L; - - private Gson gson; - private TagSink tagSink; - private Properties properties; - - @Override - public boolean initialize(Properties props) { - - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagFileSource.initialize()"); - } - - if (props == null || MapUtils.isEmpty(props)) { - LOG.error("No properties specified for TagFileSource initialization"); - this.properties = new Properties(); - } else { - this.properties = props; - } - - boolean ret = true; - - if (StringUtils.isBlank(TagSyncConfig.getTagSourceFileName(properties))) { - ret = false; - LOG.error("value of property 'ranger.tagsync.source.impl.class' is file and no value specified for property 'ranger.tagsync.filesource.filename'!"); - } - - if (ret) { - - long fileModTimeCheckIntervalInMs = TagSyncConfig.getTagSourceFileModTimeCheckIntervalInMillis(properties); - - if (fileModTimeCheckIntervalInMs <= 0L) { - LOG.info("'ranger.tagsync.filesource.modtime.check.interval' is zero or negative! 'ranger.tagsync.filesource.modtime.check.interval'=" + fileModTimeCheckIntervalInMs + "ms"); - LOG.info("Setting 'ranger.tagsync.filesource.modtime.check.interval' to 60 seconds"); - fileModTimeCheckIntervalInMs = 60*1000; - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("'ranger.tagsync.filesource.modtime.check.interval':" + fileModTimeCheckIntervalInMs + "ms"); - } - } - sourceFileName = TagSyncConfig.getTagSourceFileName(properties); - - if (LOG.isDebugEnabled()) { - LOG.debug("Provided sourceFileName=" + sourceFileName); - } - - String realFileName = TagSyncConfig.getResourceFileName(sourceFileName); - if (realFileName != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Real sourceFileName=" + realFileName); - } - sourceFileName = realFileName; - } else { - LOG.error(sourceFileName + " is not a file or is not readable"); - ret = false; - } - } - - if (ret) { - try { - gson = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create(); - } catch (Throwable excp) { - LOG.fatal("failed to create GsonBuilder object", excp); - ret = false; - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("<== TagFileSource.initialize(): sourceFileName=" + sourceFileName + ", result=" + ret); - } - - return ret; - } - - @Override - public void setTagSink(TagSink sink) { - if (sink == null) { - LOG.error("Sink is null!!!"); - } else { - this.tagSink = sink; - } - } - - @Override - public Thread start() { - - Thread fileMonitoringThread = null; - - fileMonitoringThread = new Thread(this); - fileMonitoringThread.setDaemon(true); - fileMonitoringThread.start(); - - return fileMonitoringThread; - } - - @Override - public void run() { - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagFileSource.run()"); - } - long sleepTimeBetweenCycleInMillis = TagSyncConfig.getTagSourceFileModTimeCheckIntervalInMillis(properties); - boolean shutdownFlag = false; - - while (!shutdownFlag) { - - try { - if (isChanged()) { - LOG.info("Begin: update tags from source==>sink"); - if (TagSyncConfig.isTagSyncEnabled(properties)) { - updateSink(); - LOG.info("End: update tags from source==>sink"); - } else { - LOG.info("Tag-sync is not enabled."); - } - } else { - LOG.debug("TagFileSource: no change found for synchronization."); - } - - LOG.debug("Sleeping for [" + sleepTimeBetweenCycleInMillis + "] milliSeconds"); - - Thread.sleep(sleepTimeBetweenCycleInMillis); - } - catch (InterruptedException e) { - LOG.error("Failed to wait for [" + sleepTimeBetweenCycleInMillis + "] milliseconds before checking for update to tagFileSource", e); - shutdownFlag = true; - } - catch (Throwable t) { - LOG.error("tag-sync thread got an error", t); - } - } - - LOG.info("Shutting down the Tag-file-source thread"); - - if (LOG.isDebugEnabled()) { - LOG.debug("<== TagFileSource.run()"); - } - } - - @Override - public void updateSink() throws Exception { - - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagFileSource.updateSink()"); - } - ServiceTags serviceTags = readFromFile(); - - if (serviceTags != null) { - tagSink.uploadServiceTags(serviceTags); - } else { - LOG.error("Could not read ServiceTags from file"); - } - if (LOG.isDebugEnabled()) { - LOG.debug("<== TagFileSource.updateSink()"); - } - } - - @Override - public boolean isChanged() { - - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagFileSource.isChanged()"); - } - boolean ret = false; - - long modificationTime = getModificationTime(); - - if (modificationTime > lastModifiedTimeInMillis) { - if (LOG.isDebugEnabled()) { - Date modifiedDate = new Date(modificationTime); - Date lastModifiedDate = new Date(lastModifiedTimeInMillis); - LOG.debug("File modified at " + modifiedDate + "last-modified at " + lastModifiedDate); - } - lastModifiedTimeInMillis = modificationTime; - ret = true; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== TagFileSource.isChanged(): result=" + ret); - } - return ret; - } - - private ServiceTags readFromFile() { - - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagFileSource.readFromFile(): sourceFileName=" + sourceFileName); - } - - ServiceTags ret = null; - - Reader reader = null; - try { - - reader = new InputStreamReader(TagSyncConfig.getFileInputStream(sourceFileName)); - - ret = gson.fromJson(reader, ServiceTags.class); - - } - catch (FileNotFoundException exception) { - LOG.warn("Tag-source file does not exist or not readble '" + sourceFileName + "'"); - } - catch (Exception excp) { - LOG.error("failed to load service-tags from Tag-source file " + sourceFileName, excp); - } - finally { - if (reader != null) { - try { - reader.close(); - } catch (Exception excp) { - LOG.error("error while closing opened Tag-source file " + sourceFileName, excp); - } - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== TagFileSource.readFromFile(): sourceFileName=" + sourceFileName); - } - - return ret; - } - - private long getModificationTime() { - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagFileSource.getLastModificationTime(): sourceFileName=" + sourceFileName); - } - long ret = 0L; - - File sourceFile = new File(sourceFileName); - - if (sourceFile.exists() && sourceFile.isFile() && sourceFile.canRead()) { - ret = sourceFile.lastModified(); - } else { - ret = new Date().getTime(); - } - if (LOG.isDebugEnabled()) { - LOG.debug("<== TagFileSource.lastModificationTime(): sourceFileName=" + sourceFileName + " result=" + new Date(ret)); - } - - return ret; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/resources/etc/ranger/data/tags.json ---------------------------------------------------------------------- diff --git a/tagsync/src/main/resources/etc/ranger/data/tags.json b/tagsync/src/main/resources/etc/ranger/data/tags.json index 274cf69..b4cd736 100644 --- a/tagsync/src/main/resources/etc/ranger/data/tags.json +++ b/tagsync/src/main/resources/etc/ranger/data/tags.json @@ -1,9 +1,7 @@ { "op":"add_or_update", - "tagModel":"shared", + "tagModel": "resource_private", "serviceName": "cl1_hive", - "tagVersion": 24, - "tagUpdateTime": "20150901-20:03:17.000-+0000", "tagDefinitions": { "1": { "name": "EXPIRES_ON", @@ -15,12 +13,7 @@ } ], "id": 1, - "guid": "1441137512654_323_77", - "isEnabled": true, - "createdBy": "Admin", - "updatedBy": "Admin", - "createTime": "20150901-19:58:33.000-+0000", - "updateTime": "20150901-19:58:33.000-+0000" + "guid": "tagdefinition-1-guid" } }, "tags": { @@ -30,12 +23,7 @@ "expiry_date": "2014/12/31" }, "id": 1, - "guid": "1441137512698_844_80", - "isEnabled": true, - "createdBy": "Admin", - "updatedBy": "Admin", - "createTime": "20150901-19:58:33.000-+0000", - "updateTime": "20150901-20:03:17.000-+0000" + "guid": "tag-1-guid" } }, "serviceResources": [ @@ -57,14 +45,8 @@ "isRecursive": false } }, - "resourceSignature": "c1114679b35a65d28e0dca4fdffc27d6", "id": 1, - "guid": "1441137512756_887_83", - "isEnabled": true, - "createdBy": "Admin", - "updatedBy": "Admin", - "createTime": "20150901-19:58:33.000-+0000", - "updateTime": "20150901-19:58:33.000-+0000" + "guid": "serviceresource-1-guid" } ], "resourceToTagIds": { http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/main/resources/ranger-tagsync-default.xml ---------------------------------------------------------------------- diff --git a/tagsync/src/main/resources/ranger-tagsync-default.xml b/tagsync/src/main/resources/ranger-tagsync-default.xml index b917d29..3784df8 100644 --- a/tagsync/src/main/resources/ranger-tagsync-default.xml +++ b/tagsync/src/main/resources/ranger-tagsync-default.xml @@ -55,4 +55,9 @@ <value>tagadmin</value> <description></description> </property> + <property> + <name>ranger.tagsync.atlasrestsource.download.interval</name> + <value>600000</value> + <description></description> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/5b9c094f/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java ---------------------------------------------------------------------- diff --git a/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java index 59d521c..43c22d0 100644 --- a/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java +++ b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java @@ -20,12 +20,7 @@ package org.apache.ranger.tagsync.process; -import org.apache.commons.lang.StringUtils; -import org.apache.ranger.tagsync.model.TagSource; -import org.apache.ranger.tagsync.process.TagSyncConfig; -import org.apache.ranger.tagsync.process.TagSynchronizer; -import org.apache.ranger.tagsync.source.atlas.AtlasNotificationMapper; -import org.apache.ranger.tagsync.source.atlas.TagAtlasSource; +import org.apache.ranger.tagsync.source.atlas.AtlasHiveResourceMapper; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -34,8 +29,6 @@ import java.io.*; import java.util.List; import java.util.Properties; -import static org.junit.Assert.*; - public class TestTagSynchronizer { @@ -79,7 +72,7 @@ public class TestTagSynchronizer { @Test public void testTagDownload() { - boolean initDone = false; + boolean initDone = true; /* For tagSynchronizer.initialize() to succeed, edit ranger-tagsync-site.xml file to contain correct values of the following properties: @@ -97,10 +90,12 @@ public class TestTagSynchronizer { */ - //initDone = tagSynchronizer.initialize(); +// initDone = tagSynchronizer.initialize(null); System.out.println("TagSynchronizer initialization result=" + initDone); + assert(initDone); + System.out.println("Exiting testTagDownload()"); } @@ -108,14 +103,15 @@ public class TestTagSynchronizer { public void testQualifiedNames() { List<String> components; + AtlasHiveResourceMapper hiveResourceBuilder = new AtlasHiveResourceMapper(); try { - components = AtlasNotificationMapper.getQualifiedNameComponents("hive_db", "database@cluster"); + components = hiveResourceBuilder.getQualifiedNameComponents("hive_db", "database@cluster"); printComponents(components); - components = AtlasNotificationMapper.getQualifiedNameComponents("hive_table", "database.table@cluster"); + components = hiveResourceBuilder.getQualifiedNameComponents("hive_table", "database.table@cluster"); printComponents(components); - components = AtlasNotificationMapper.getQualifiedNameComponents("hive_column", "database.table.column@cluster"); + components = hiveResourceBuilder.getQualifiedNameComponents("hive_column", "database.table.column@cluster"); printComponents(components); assert(true);
