Repository: ranger Updated Branches: refs/heads/ranger-0.7 8ebad64ef -> 30b1188fe
RANGER-1897: tagsync update to replace Atlas V1 API usage with Atlas V2 API for tag-download using REST Project: http://git-wip-us.apache.org/repos/asf/ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/30b1188f Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/30b1188f Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/30b1188f Branch: refs/heads/ranger-0.7 Commit: 30b1188fe54788bcca3216dbeeb2f956e5cb9c9d Parents: 8ebad64 Author: Madhan Neethiraj <[email protected]> Authored: Tue Nov 21 11:03:53 2017 -0800 Committer: Madhan Neethiraj <[email protected]> Committed: Sat Nov 25 11:59:58 2017 -0800 ---------------------------------------------------------------------- pom.xml | 9 +- src/main/assembly/tagsync.xml | 12 +- tagsync/pom.xml | 42 ++- .../source/atlas/AtlasHbaseResourceMapper.java | 19 +- .../source/atlas/AtlasHdfsResourceMapper.java | 17 +- .../source/atlas/AtlasHiveResourceMapper.java | 19 +- .../source/atlas/AtlasKafkaResourceMapper.java | 15 +- .../source/atlas/AtlasNotificationMapper.java | 317 ++++++++++++++---- .../source/atlas/AtlasResourceMapper.java | 7 + .../source/atlas/AtlasResourceMapperUtil.java | 25 ++ .../tagsync/source/atlas/AtlasTagSource.java | 48 +-- .../source/atlasrest/AtlasRESTTagSource.java | 129 +++++--- .../tagsync/source/atlasrest/AtlasRESTUtil.java | 325 ------------------- 13 files changed, 512 insertions(+), 472 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 80de97e..cc09475 100644 --- a/pom.xml +++ b/pom.xml @@ -125,7 +125,7 @@ <asm.all.version>3.2</asm.all.version> <asm.version>3.1</asm.version> <aspectj.version>1.8.2</aspectj.version> - <atlas.version>0.7-incubating</atlas.version> + <atlas.version>0.8.2-SNAPSHOT</atlas.version> <atlas.guava.version>14.0</atlas.guava.version> <atlas.gson.version>2.5</atlas.gson.version> <atlas.jettison.version>1.3.7</atlas.jettison.version> @@ -357,7 +357,12 @@ <groupId>com.webcohesion.enunciate</groupId> <artifactId>enunciate-core-annotations</artifactId> <version>2.8.0</version> - </dependency> + </dependency> + <dependency> + <groupId>com.sun.jersey.contribs</groupId> + <artifactId>jersey-multipart</artifactId> + <version>${jersey-bundle.version}</version> + </dependency> </dependencies> </dependencyManagement> <build> http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/src/main/assembly/tagsync.xml ---------------------------------------------------------------------- diff --git a/src/main/assembly/tagsync.xml b/src/main/assembly/tagsync.xml index 0b17151..26b42ca 100644 --- a/src/main/assembly/tagsync.xml +++ b/src/main/assembly/tagsync.xml @@ -40,12 +40,16 @@ <include>com.google.inject:guice:jar:${guice.version}</include> <include>com.google.inject.extensions:guice-multibindings:jar:${guice.version}</include> <include>com.sun.jersey:jersey-bundle:jar:${jersey-bundle.version}</include> + <include>com.sun.jersey.contribs:jersey-multipart:jar:${sun-jersey-bundle.version}</include> <include>com.thoughtworks.paranamer:paranamer:jar:${paranamer.version}</include> <include>com.yammer.metrics:metrics-core</include> <include>org.apache.atlas:atlas-notification:jar:${atlas.version}</include> <include>org.apache.atlas:atlas-typesystem:jar:${atlas.version}</include> <include>org.apache.atlas:atlas-client:jar:${atlas.version}</include> + <include>org.apache.atlas:atlas-client-common:jar:${atlas.version}</include> + <include>org.apache.atlas:atlas-client-v2:jar:${atlas.version}</include> <include>org.apache.atlas:atlas-common:jar:${atlas.version}</include> + <include>org.apache.atlas:atlas-intg:jar:${atlas.version}</include> <include>org.apache.hadoop:hadoop-auth</include> <include>org.apache.hadoop:hadoop-common</include> <include>org.apache.kafka:kafka_${scala.binary.version}:jar:${kafka.version}</include> @@ -55,10 +59,10 @@ <include>org.apache.ranger:ranger-plugins-common</include> <include>org.apache.ranger:ranger-util</include> <include>org.apache.zookeeper:zookeeper:jar:${zookeeper.version}</include> - <include>org.codehaus.jackson:jackson-core-asl</include> - <include>org.codehaus.jackson:jackson-jaxrs</include> - <include>org.codehaus.jackson:jackson-mapper-asl</include> - <include>org.codehaus.jackson:jackson-xc</include> + <include>org.codehaus.jackson:jackson-core-asl:jar:${codehaus.jackson.version}</include> + <include>org.codehaus.jackson:jackson-jaxrs:jar:${codehaus.jackson.version}</include> + <include>org.codehaus.jackson:jackson-mapper-asl:jar:${codehaus.jackson.version}</include> + <include>org.codehaus.jackson:jackson-xc:jar:${codehaus.jackson.version}</include> <include>org.codehaus.jettison:jettison:jar:${jettison.version}</include> <include>org.json4s:json4s-native_${scala.binary.version}:jar:${json4s.version}</include> <include>org.json4s:json4s-core_${scala.binary.version}:jar:${json4s.version}</include> http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/pom.xml ---------------------------------------------------------------------- diff --git a/tagsync/pom.xml b/tagsync/pom.xml index 42e9d2f..417a12f 100644 --- a/tagsync/pom.xml +++ b/tagsync/pom.xml @@ -55,6 +55,11 @@ <version>${jersey-bundle.version}</version> </dependency> <dependency> + <groupId>com.sun.jersey.contribs</groupId> + <artifactId>jersey-multipart</artifactId> + <version>${sun-jersey-bundle.version}</version> + </dependency> + <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>${commons.cli.version}</version> @@ -110,6 +115,26 @@ <version>${jettison.version}</version> </dependency> <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + <version>${codehaus.jackson.version}</version> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-jaxrs</artifactId> + <version>${codehaus.jackson.version}</version> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>${codehaus.jackson.version}</version> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-xc</artifactId> + <version>${codehaus.jackson.version}</version> + </dependency> + <dependency> <groupId>org.apache.atlas</groupId> <artifactId>atlas-notification</artifactId> <version>${atlas.version}</version> @@ -121,7 +146,17 @@ </dependency> <dependency> <groupId>org.apache.atlas</groupId> - <artifactId>atlas-client</artifactId> + <artifactId>atlas-client-v1</artifactId> + <version>${atlas.version}</version> + </dependency> + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-client-common</artifactId> + <version>${atlas.version}</version> + </dependency> + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-client-v2</artifactId> <version>${atlas.version}</version> </dependency> <dependency> @@ -144,6 +179,11 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-intg</artifactId> + <version>${atlas.version}</version> + </dependency> + <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.binary.version}</artifactId> <version>${kafka.version}</version> http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java index 8b36a31..00615e4 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java @@ -22,6 +22,7 @@ package org.apache.ranger.tagsync.source.atlas; import java.util.Map; import java.util.HashMap; +import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.commons.lang.StringUtils; import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource; @@ -48,7 +49,23 @@ public class AtlasHbaseResourceMapper extends AtlasResourceMapper { @Override public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception { + String entityGuid = entity.getId() != null ? entity.getId()._getId() : null; + String entityType = entity.getTypeName(); String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class); + + return getServiceResource(entityGuid, entityType, qualifiedName); + } + + @Override + public RangerServiceResource buildResource(final AtlasEntityHeader entity) throws Exception { + String entityGuid = entity.getGuid(); + String entityType = entity.getTypeName(); + String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class); + + return getServiceResource(entityGuid, entityType, qualifiedName); + } + + private RangerServiceResource getServiceResource(String entityGuid, String entityType, String qualifiedName) throws Exception { if (StringUtils.isEmpty(qualifiedName)) { throw new Exception("attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME + "' not found in entity"); } @@ -63,8 +80,6 @@ public class AtlasHbaseResourceMapper extends AtlasResourceMapper { throwExceptionWithMessage("cluster-name not found in attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME + "': " + qualifiedName); } - String entityType = entity.getTypeName(); - String entityGuid = entity.getId() != null ? entity.getId()._getId() : null; String serviceName = getRangerServiceName(clusterName); Map<String, RangerPolicyResource> elements = new HashMap<String, RangerPolicyResource>(); http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java index 06bff90..d970859 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java @@ -22,6 +22,7 @@ package org.apache.ranger.tagsync.source.atlas; import java.util.HashMap; import java.util.Map; +import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; @@ -57,10 +58,25 @@ public class AtlasHdfsResourceMapper extends AtlasResourceMapper { @Override public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception { + String entityGuid = entity.getId() != null ? entity.getId()._getId() : null; String path = getEntityAttribute(entity, ENTITY_ATTRIBUTE_PATH, String.class); String clusterName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_CLUSTER_NAME, String.class); String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class); + return getServiceResource(entityGuid, path, clusterName, qualifiedName); + } + + @Override + public RangerServiceResource buildResource(final AtlasEntityHeader entity) throws Exception { + String entityGuid = entity.getGuid(); + String path = getEntityAttribute(entity, ENTITY_ATTRIBUTE_PATH, String.class); + String clusterName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_CLUSTER_NAME, String.class); + String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class); + + return getServiceResource(entityGuid, path, clusterName, qualifiedName); + } + + private RangerServiceResource getServiceResource(String entityGuid, String path, String clusterName, String qualifiedName) throws Exception { if(StringUtils.isEmpty(path)) { path = getResourceNameFromQualifiedName(qualifiedName); @@ -81,7 +97,6 @@ public class AtlasHdfsResourceMapper extends AtlasResourceMapper { } } - String entityGuid = entity.getId() != null ? entity.getId()._getId() : null; String serviceName = getRangerServiceName(clusterName); Boolean isExcludes = Boolean.FALSE; Boolean isRecursive = Boolean.TRUE; http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java index a359622..84d1226 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java @@ -22,6 +22,7 @@ package org.apache.ranger.tagsync.source.atlas; import java.util.Map; import java.util.HashMap; +import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.commons.lang.StringUtils; import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource; @@ -47,7 +48,23 @@ public class AtlasHiveResourceMapper extends AtlasResourceMapper { @Override public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception { + String entityGuid = entity.getId() != null ? entity.getId()._getId() : null; + String entityType = entity.getTypeName(); String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class); + + return getServiceResource(entityGuid, entityType, qualifiedName); + } + + @Override + public RangerServiceResource buildResource(final AtlasEntityHeader entity) throws Exception { + String entityGuid = entity.getGuid(); + String entityType = entity.getTypeName(); + String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class); + + return getServiceResource(entityGuid, entityType, qualifiedName); + } + + private RangerServiceResource getServiceResource(String entityGuid, String entityType, String qualifiedName) throws Exception { if (StringUtils.isEmpty(qualifiedName)) { throw new Exception("attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME + "' not found in entity"); } @@ -62,8 +79,6 @@ public class AtlasHiveResourceMapper extends AtlasResourceMapper { throwExceptionWithMessage("cluster-name not found in attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME + "': " + qualifiedName); } - String entityType = entity.getTypeName(); - String entityGuid = entity.getId() != null ? entity.getId()._getId() : null; String serviceName = getRangerServiceName(clusterName); String[] resources = resourceStr.split(QUALIFIED_NAME_DELIMITER); String dbName = resources.length > 0 ? resources[0] : null; http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java index 9f1fc2d..0c0247f 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java @@ -22,6 +22,7 @@ package org.apache.ranger.tagsync.source.atlas; import java.util.HashMap; import java.util.Map; +import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.commons.lang.StringUtils; import org.apache.ranger.plugin.model.RangerPolicy; @@ -42,8 +43,21 @@ public class AtlasKafkaResourceMapper extends AtlasResourceMapper { @Override public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception { + String entityGuid = entity.getId() != null ? entity.getId()._getId() : null; String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class); + return getServiceResource(entityGuid, qualifiedName); + } + + @Override + public RangerServiceResource buildResource(final AtlasEntityHeader entity) throws Exception { + String entityGuid = entity.getGuid(); + String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class); + + return getServiceResource(entityGuid, qualifiedName); + } + + private RangerServiceResource getServiceResource(String entityGuid, String qualifiedName) throws Exception { String topic = getResourceNameFromQualifiedName(qualifiedName); if(StringUtils.isEmpty(topic)) { @@ -67,7 +81,6 @@ public class AtlasKafkaResourceMapper extends AtlasResourceMapper { elements.put(RANGER_TYPE_KAFKA_TOPIC, new RangerPolicyResource(topic, isExcludes, isRecursive)); - String entityGuid = entity.getId() != null ? entity.getId()._getId() : null; String serviceName = getRangerServiceName(clusterName); return new RangerServiceResource(entityGuid, serviceName, elements); http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java index 922317e..f42c908 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java @@ -20,7 +20,16 @@ package org.apache.ranger.tagsync.source.atlas; import org.apache.atlas.AtlasException; +import org.apache.atlas.model.discovery.AtlasSearchResult; +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.apache.atlas.notification.entity.EntityNotification; +import org.apache.atlas.type.AtlasBuiltInTypes; +import org.apache.atlas.type.AtlasClassificationType; +import org.apache.atlas.type.AtlasStructType.AtlasAttribute; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.persistence.Id; @@ -35,10 +44,9 @@ import org.apache.ranger.plugin.model.RangerTagDef; import org.apache.ranger.plugin.model.RangerTagDef.RangerTagAttributeDef; import org.apache.ranger.plugin.util.ServiceTags; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.*; public class AtlasNotificationMapper { private static final Log LOG = LogFactory.getLog(AtlasNotificationMapper.class); @@ -46,6 +54,17 @@ public class AtlasNotificationMapper { private static Map<String, Long> unhandledEventTypes = new HashMap<String, Long>(); + private static final ThreadLocal<DateFormat> DATE_FORMATTER = new ThreadLocal<DateFormat>() { + @Override + protected DateFormat initialValue() { + SimpleDateFormat dateFormat = new SimpleDateFormat(AtlasBaseTypeDef.SERIALIZED_DATE_FORMAT_STR); + + dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + + return dateFormat; + } + }; + private static void logUnhandledEntityNotification(EntityNotification entityNotification) { final int REPORTING_INTERVAL_FOR_UNHANDLED_ENTITYTYPE_IN_MILLIS = 5 * 60 * 1000; // 5 minutes @@ -134,6 +153,7 @@ public class AtlasNotificationMapper { case ENTITY_UPDATE: case ENTITY_DELETE: case TRAIT_ADD: + case TRAIT_UPDATE: case TRAIT_DELETE: { ret = true; break; @@ -175,7 +195,6 @@ public class AtlasNotificationMapper { } static private Map<String, ServiceTags> buildServiceTags(List<AtlasEntityWithTraits> entitiesWithTraits) throws Exception { - Map<String, ServiceTags> ret = new HashMap<String, ServiceTags>(); for (AtlasEntityWithTraits element : entitiesWithTraits) { @@ -189,11 +208,163 @@ public class AtlasNotificationMapper { } } + return ret; + } + + static private ServiceTags buildServiceTags(AtlasEntityWithTraits entityWithTraits, Map<String, ServiceTags> serviceTagsMap) throws Exception { + ServiceTags ret = null; + IReferenceableInstance entity = entityWithTraits.getEntity(); + RangerServiceResource serviceResource = AtlasResourceMapperUtil.getRangerServiceResource(entity); + + if (serviceResource != null) { + List<RangerTag> tags = getTags(entityWithTraits); + List<RangerTagDef> tagDefs = getTagDefs(entityWithTraits); + String serviceName = serviceResource.getServiceName(); + + ret = createOrGetServiceTags(serviceTagsMap, serviceName); + + if (serviceTagsMap == null || CollectionUtils.isNotEmpty(tags)) { + serviceResource.setId((long) ret.getServiceResources().size()); + ret.getServiceResources().add(serviceResource); + + List<Long> tagIds = new ArrayList<>(); + + if (CollectionUtils.isNotEmpty(tags)) { + for (RangerTag tag : tags) { + tag.setId((long) ret.getTags().size()); + ret.getTags().put(tag.getId(), tag); + + tagIds.add(tag.getId()); + } + } + ret.getResourceToTagIds().put(serviceResource.getId(), tagIds); + + if (CollectionUtils.isNotEmpty(tagDefs)) { + for (RangerTagDef tagDef : tagDefs) { + tagDef.setId((long) ret.getTagDefinitions().size()); + ret.getTagDefinitions().put(tagDef.getId(), tagDef); + } + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Entity " + entityWithTraits + " does not have any tags associated with it when full-sync is being done."); + LOG.debug("Will not add this entity to serviceTags, so that this entity, if exists, will be removed from ranger"); + } + } + } else { + LOG.error("Failed to build serviceResource for entity:" + entity.getId()._getId()); + } + + return ret; + } + + static private ServiceTags createOrGetServiceTags(Map<String, ServiceTags> serviceTagsMap, String serviceName) { + ServiceTags ret = serviceTagsMap == null ? null : serviceTagsMap.get(serviceName); + + if (ret == null) { + ret = new ServiceTags(); + + if (serviceTagsMap != null) { + serviceTagsMap.put(serviceName, ret); + } + + ret.setOp(ServiceTags.OP_ADD_OR_UPDATE); + ret.setServiceName(serviceName); + } + + return ret; + } + + static private List<RangerTag> getTags(AtlasEntityWithTraits entityWithTraits) { + List<RangerTag> ret = new ArrayList<RangerTag>(); + IReferenceableInstance entity = entityWithTraits != null ? entityWithTraits.getEntity() : null; + + if(entity != null && CollectionUtils.isNotEmpty(entity.getTraits())) { + for (String traitName : entity.getTraits()) { + IStruct trait = entity.getTrait(traitName); + Map<String, String> tagAttrs = new HashMap<String, String>(); + + try { + Map<String, Object> attrs = trait.getValuesMap(); + + if(MapUtils.isNotEmpty(attrs)) { + for (Map.Entry<String, Object> attrEntry : attrs.entrySet()) { + String attrName = attrEntry.getKey(); + Object attrValue = attrEntry.getValue(); + + tagAttrs.put(attrName, attrValue != null ? attrValue.toString() : null); + } + } + + } catch (AtlasException exception) { + LOG.error("Could not get values for trait:" + trait.getTypeName(), exception); + } + + ret.add(new RangerTag(null, trait.getTypeName(), tagAttrs, RangerTag.OWNER_SERVICERESOURCE)); + } + } + + return ret; + } + + static private List<RangerTagDef> getTagDefs(AtlasEntityWithTraits entityWithTraits) { + List<RangerTagDef> ret = new ArrayList<RangerTagDef>(); + IReferenceableInstance entity = entityWithTraits != null ? entityWithTraits.getEntity() : null; + + if(entity != null && CollectionUtils.isNotEmpty(entity.getTraits())) { + for (String traitName : entity.getTraits()) { + IStruct trait = entity.getTrait(traitName); + RangerTagDef tagDef = new RangerTagDef(trait.getTypeName(), "Atlas"); + + try { + Map<String, Object> attrs = trait.getValuesMap(); + + if(MapUtils.isNotEmpty(attrs)) { + for (String attrName : attrs.keySet()) { + tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attrName, "string")); + } + } + } catch (AtlasException exception) { + LOG.error("Could not get values for trait:" + trait.getTypeName(), exception); + } + + ret.add(tagDef); + } + } + + return ret; + } + + public static Map<String, ServiceTags> processSearchResult(AtlasSearchResult result, AtlasTypeRegistry typeRegistry) { + Map<String, ServiceTags> ret = null; + + try { + ret = buildServiceTags(result, typeRegistry); + } catch (Exception exception) { + LOG.error("Failed to build serviceTags", exception); + } + + return ret; + } + + static private Map<String, ServiceTags> buildServiceTags(AtlasSearchResult result, AtlasTypeRegistry typeRegistry) throws Exception { + Map<String, ServiceTags> ret = new HashMap<>(); + + for (AtlasEntityHeader entity : result.getEntities()) { + if (entity != null && entity.getStatus() == AtlasEntity.Status.ACTIVE) { + buildServiceTags(entity, typeRegistry, ret); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring entity because its State is not ACTIVE: " + entity); + } + } + } + // Remove duplicate tag definitions if(CollectionUtils.isNotEmpty(ret.values())) { for (ServiceTags serviceTag : ret.values()) { if(MapUtils.isNotEmpty(serviceTag.getTagDefinitions())) { - Map<String, RangerTagDef> uniqueTagDefs = new HashMap<String, RangerTagDef>(); + Map<String, RangerTagDef> uniqueTagDefs = new HashMap<>(); for (RangerTagDef tagDef : serviceTag.getTagDefinitions().values()) { RangerTagDef existingTagDef = uniqueTagDefs.get(tagDef.getName()); @@ -239,25 +410,22 @@ public class AtlasNotificationMapper { return ret; } - static private ServiceTags buildServiceTags(AtlasEntityWithTraits entityWithTraits, Map<String, ServiceTags> serviceTagsMap) throws Exception { - ServiceTags ret = null; - IReferenceableInstance entity = entityWithTraits.getEntity(); - RangerServiceResource serviceResource = AtlasResourceMapperUtil.getRangerServiceResource(entity); + static private ServiceTags buildServiceTags(AtlasEntityHeader entity, AtlasTypeRegistry typeRegistry, Map<String, ServiceTags> serviceTagsMap) throws Exception { + ServiceTags ret = null; + RangerServiceResource serviceResource = AtlasResourceMapperUtil.getRangerServiceResource(entity); if (serviceResource != null) { - - List<RangerTag> tags = getTags(entityWithTraits); - List<RangerTagDef> tagDefs = getTagDefs(entityWithTraits); + List<RangerTag> tags = getTags(entity, typeRegistry); + List<RangerTagDef> tagDefs = getTagDefs(entity); String serviceName = serviceResource.getServiceName(); ret = createOrGetServiceTags(serviceTagsMap, serviceName); if (serviceTagsMap == null || CollectionUtils.isNotEmpty(tags)) { - serviceResource.setId((long) ret.getServiceResources().size()); ret.getServiceResources().add(serviceResource); - List<Long> tagIds = new ArrayList<Long>(); + List<Long> tagIds = new ArrayList<>(); if (CollectionUtils.isNotEmpty(tags)) { for (RangerTag tag : tags) { @@ -277,90 +445,125 @@ public class AtlasNotificationMapper { } } else { if (LOG.isDebugEnabled()) { - LOG.debug("Entity " + entityWithTraits + " does not have any tags associated with it when full-sync is being done."); + LOG.debug("Entity " + entity + " does not have any tags associated with it when full-sync is being done."); LOG.debug("Will not add this entity to serviceTags, so that this entity, if exists, will be removed from ranger"); } } } else { - LOG.error("Failed to build serviceResource for entity:" + entity.getId()._getId()); + LOG.error("Failed to build serviceResource for entity:" + entity.getGuid()); } return ret; } - static private ServiceTags createOrGetServiceTags(Map<String, ServiceTags> serviceTagsMap, String serviceName) { - ServiceTags ret = serviceTagsMap == null ? null : serviceTagsMap.get(serviceName); + static private List<RangerTag> getTags(AtlasEntityHeader entity, AtlasTypeRegistry typeRegistry) { + List<RangerTag> ret = new ArrayList<>(); - if (ret == null) { - ret = new ServiceTags(); + if(entity != null && CollectionUtils.isNotEmpty(entity.getClassificationNames())) { + List<AtlasClassification> classifications = entity.getClassifications(); - if (serviceTagsMap != null) { - serviceTagsMap.put(serviceName, ret); + for (AtlasClassification classification : classifications) { + ret.add(getRangerTag(classification, typeRegistry)); + + List<AtlasClassification> superClassifications = getSuperClassifications(classification, typeRegistry); + + if (CollectionUtils.isNotEmpty(superClassifications)) { + for (AtlasClassification superClassification : superClassifications) { + ret.add(getRangerTag(superClassification, typeRegistry)); + } + } } + } - ret.setOp(ServiceTags.OP_ADD_OR_UPDATE); - ret.setServiceName(serviceName); + return ret; + } + + static private List<RangerTagDef> getTagDefs(AtlasEntityHeader entity) { + List<RangerTagDef> ret = new ArrayList<>(); + + if(entity != null && CollectionUtils.isNotEmpty(entity.getClassificationNames())) { + List<AtlasClassification> traits = entity.getClassifications(); + + for (AtlasClassification trait : traits) { + RangerTagDef tagDef = new RangerTagDef(trait.getTypeName(), "Atlas"); + + if(MapUtils.isNotEmpty(trait.getAttributes())) { + for (String attrName : trait.getAttributes().keySet()) { + tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attrName, "string")); + } + } + + ret.add(tagDef); + } } return ret; } - static private List<RangerTag> getTags(AtlasEntityWithTraits entityWithTraits) { - List<RangerTag> ret = new ArrayList<RangerTag>(); + static private List<AtlasClassification> getSuperClassifications(AtlasClassification classification, AtlasTypeRegistry typeRegistry) { + List<AtlasClassification> ret = null; + AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classification.getTypeName()); - if(entityWithTraits != null && CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) { - List<IStruct> traits = entityWithTraits.getAllTraits(); + if (classificationType != null && CollectionUtils.isNotEmpty(classificationType.getAllSuperTypes())) { + ret = new ArrayList<>(classificationType.getAllSuperTypes().size()); - for (IStruct trait : traits) { - Map<String, String> tagAttrs = new HashMap<String, String>(); + for (String superTypeName : classificationType.getAllSuperTypes()) { + AtlasClassification superClassification = new AtlasClassification(superTypeName); - try { - Map<String, Object> attrs = trait.getValuesMap(); + if (MapUtils.isNotEmpty(classification.getAttributes())) { + AtlasClassificationType superClassificationType = typeRegistry.getClassificationTypeByName(superTypeName); - if(MapUtils.isNotEmpty(attrs)) { - for (Map.Entry<String, Object> attrEntry : attrs.entrySet()) { - String attrName = attrEntry.getKey(); - Object attrValue = attrEntry.getValue(); + if (superClassificationType != null && MapUtils.isNotEmpty(superClassificationType.getAllAttributes())) { + Map<String, Object> superClassificationAttributes = new HashMap<>(); - tagAttrs.put(attrName, attrValue != null ? attrValue.toString() : null); + for (Map.Entry<String, Object> entry : classification.getAttributes().entrySet()) { + String attrName = entry.getKey(); + + if (superClassificationType.getAllAttributes().containsKey(attrName)) { + superClassificationAttributes.put(attrName, entry.getValue()); + } } + + superClassification.setAttributes(superClassificationAttributes); } - } catch (AtlasException exception) { - LOG.error("Could not get values for trait:" + trait.getTypeName(), exception); } - ret.add(new RangerTag(null, trait.getTypeName(), tagAttrs, RangerTag.OWNER_SERVICERESOURCE)); + ret.add(superClassification); } } return ret; } - static private List<RangerTagDef> getTagDefs(AtlasEntityWithTraits entityWithTraits) { - List<RangerTagDef> ret = new ArrayList<RangerTagDef>(); + static private RangerTag getRangerTag(AtlasClassification classification, AtlasTypeRegistry typeRegistry) { + final Map<String, String> tagAttrs; - if(entityWithTraits != null && CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) { - List<IStruct> traits = entityWithTraits.getAllTraits(); + if(MapUtils.isNotEmpty(classification.getAttributes())) { + tagAttrs = new HashMap<>(); - for (IStruct trait : traits) { - RangerTagDef tagDef = new RangerTagDef(trait.getTypeName(), "Atlas"); + for (Map.Entry<String, Object> attrEntry : classification.getAttributes().entrySet()) { + String attrName = attrEntry.getKey(); + Object attrValue = attrEntry.getValue(); - try { - Map<String, Object> attrs = trait.getValuesMap(); + // V2 Atlas APIs have date attributes as number; convert the value to earlier version format, so that + // Ranger conditions can recognize the value correctly + if (attrValue instanceof Number) { + AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classification.getTypeName()); + AtlasAttribute attribute = (classificationType != null) ? classificationType.getAttribute(attrName) : null; - if(MapUtils.isNotEmpty(attrs)) { - for (String attrName : attrs.keySet()) { - tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attrName, "string")); - } + if (attribute != null && attribute.getAttributeType() instanceof AtlasBuiltInTypes.AtlasDateType) { + Date dateValue = new Date(((Number)attrValue).longValue()); + + attrValue = DATE_FORMATTER.get().format(dateValue); } - } catch (AtlasException exception) { - LOG.error("Could not get values for trait:" + trait.getTypeName(), exception); } - ret.add(tagDef); + tagAttrs.put(attrName, attrValue != null ? attrValue.toString() : null); } + } else { + tagAttrs = Collections.emptyMap(); } - return ret; + return new RangerTag(null, classification.getTypeName(), tagAttrs, RangerTag.OWNER_SERVICERESOURCE); } } http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java index 8ececdf..a2ad796 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java @@ -23,6 +23,7 @@ import java.util.Properties; import java.util.Map; import org.apache.atlas.AtlasException; +import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -75,6 +76,8 @@ public abstract class AtlasResourceMapper { abstract public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception; + abstract public RangerServiceResource buildResource(final AtlasEntityHeader entity) throws Exception; + protected String getCustomRangerServiceName(String atlasInstanceName) { if(properties != null) { String propName = TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX + componentName @@ -132,6 +135,10 @@ public abstract class AtlasResourceMapper { return ret; } + static protected <T> T getEntityAttribute(AtlasEntityHeader entity, String name, Class<T> type) { + return getAttribute(entity.getAttributes(), name, type); + } + static protected <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) { return type.cast(map.get(name)); } http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java index f9f0eaf..d004bff 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java @@ -19,6 +19,7 @@ package org.apache.ranger.tagsync.source.atlas; +import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.commons.lang.StringUtils; import org.apache.ranger.plugin.model.RangerServiceResource; @@ -74,6 +75,30 @@ public class AtlasResourceMapperUtil { return resource; } + public static RangerServiceResource getRangerServiceResource(AtlasEntityHeader atlasEntity) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> getRangerServiceResource(" + atlasEntity.getGuid() +")"); + } + + RangerServiceResource resource = null; + + AtlasResourceMapper mapper = atlasResourceMappers.get(atlasEntity.getTypeName()); + + if (mapper != null) { + try { + resource = mapper.buildResource(atlasEntity); + } catch (Exception exception) { + LOG.error("Could not get serviceResource for atlas entity:" + atlasEntity.getGuid() + ": ", exception); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== getRangerServiceResource(" + atlasEntity.getGuid() +"): resource=" + resource); + } + + return resource; + } + static public boolean initializeAtlasResourceMappers(Properties properties) { final String MAPPER_NAME_DELIMITER = ","; http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java index 12b02d9..95ff8ec 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java @@ -20,26 +20,22 @@ package org.apache.ranger.tagsync.source.atlas; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.Provider; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - +import org.apache.atlas.kafka.NotificationProvider; import org.apache.atlas.notification.NotificationConsumer; import org.apache.atlas.notification.NotificationInterface; -import org.apache.atlas.notification.NotificationModule; import org.apache.atlas.notification.entity.EntityNotification; - -import org.apache.ranger.tagsync.model.AbstractTagSource; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.ranger.plugin.util.ServiceTags; +import org.apache.ranger.tagsync.model.AbstractTagSource; +import org.apache.atlas.kafka.AtlasKafkaMessage; +import org.apache.kafka.common.TopicPartition; import java.io.IOException; import java.io.InputStream; -import java.util.Properties; import java.util.List; +import java.util.Properties; public class AtlasTagSource extends AbstractTagSource { private static final Log LOG = LogFactory.getLog(AtlasTagSource.class); @@ -102,12 +98,7 @@ public class AtlasTagSource extends AbstractTagSource { } if (ret) { - NotificationModule notificationModule = new NotificationModule(); - - Injector injector = Guice.createInjector(notificationModule); - - Provider<NotificationInterface> consumerProvider = injector.getProvider(NotificationInterface.class); - NotificationInterface notification = consumerProvider.get(); + NotificationInterface notification = NotificationProvider.get(); List<NotificationConsumer<EntityNotification>> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1); consumerTask = new ConsumerRunnable(iterators.get(0)); @@ -163,15 +154,6 @@ public class AtlasTagSource extends AbstractTagSource { this.consumer = consumer; } - private boolean hasNext() { - boolean ret = false; - try { - ret = consumer.hasNext(); - } catch (Exception exception) { - LOG.error("EntityNotification consumer threw exception, IGNORING...:", exception); - } - return ret; - } @Override public void run() { @@ -180,8 +162,11 @@ public class AtlasTagSource extends AbstractTagSource { } while (true) { try { - if (hasNext()) { - EntityNotification notification = consumer.peek(); + List<AtlasKafkaMessage<EntityNotification>> messages = consumer.receive(1000L); + + for (AtlasKafkaMessage<EntityNotification> message : messages) { + EntityNotification notification = message != null ? message.getMessage() : null; + if (notification != null) { if (LOG.isDebugEnabled()) { LOG.debug("Notification=" + getPrintableEntityNotification(notification)); @@ -191,11 +176,12 @@ public class AtlasTagSource extends AbstractTagSource { if (serviceTags != null) { updateSink(serviceTags); } + + TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition()); + consumer.commit(partition, message.getOffset()); } else { LOG.error("Null entityNotification received from Kafka!! Ignoring.."); } - // Move iterator forward - consumer.next(); } } catch (Exception exception) { LOG.error("Caught exception..: ", exception); http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java index 4e0ae90..239f143 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java @@ -23,48 +23,52 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import org.apache.commons.collections.CollectionUtils; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.SearchFilter; +import org.apache.atlas.model.discovery.AtlasSearchResult; +import org.apache.atlas.model.discovery.SearchParameters; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.type.AtlasTypeRegistry.AtlasTransientTypeRegistry; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.ranger.plugin.util.RangerRESTClient; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.ranger.tagsync.model.AbstractTagSource; import org.apache.ranger.plugin.util.ServiceTags; import org.apache.ranger.tagsync.model.TagSink; import org.apache.ranger.tagsync.process.TagSyncConfig; import org.apache.ranger.tagsync.process.TagSynchronizer; -import org.apache.ranger.tagsync.source.atlas.AtlasEntityWithTraits; import org.apache.ranger.tagsync.source.atlas.AtlasNotificationMapper; import org.apache.ranger.tagsync.source.atlas.AtlasResourceMapperUtil; -import java.util.List; +import java.io.IOException; import java.util.Map; import java.util.Properties; public class AtlasRESTTagSource extends AbstractTagSource implements Runnable { private static final Log LOG = LogFactory.getLog(AtlasRESTTagSource.class); - private long sleepTimeBetweenCycleInMillis; - - private AtlasRESTUtil atlasRESTUtil = null; - - private Thread myThread = null; + private long sleepTimeBetweenCycleInMillis; + private String[] restUrls = null; + private boolean isKerberized = false; + private String[] userNamePassword = null; + private Thread myThread = null; public static void main(String[] args) { - - AtlasRESTTagSource atlasRESTTagSource = new AtlasRESTTagSource(); - - TagSyncConfig config = TagSyncConfig.getInstance(); - - Properties props = config.getProperties(); + TagSyncConfig config = TagSyncConfig.getInstance(); + Properties props = config.getProperties(); + TagSink tagSink = TagSynchronizer.initializeTagSink(props); TagSynchronizer.printConfigurationProperties(props); - TagSink tagSink = TagSynchronizer.initializeTagSink(props); - if (tagSink != null) { + AtlasRESTTagSource atlasRESTTagSource = new AtlasRESTTagSource(); if (atlasRESTTagSource.initialize(props)) { try { @@ -79,46 +83,45 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable { LOG.error("AtlasRESTTagSource initialized failed, exiting."); System.exit(1); } - } else { LOG.error("TagSink initialialization failed, exiting."); System.exit(1); } - } + @Override public boolean initialize(Properties properties) { if (LOG.isDebugEnabled()) { LOG.debug("==> AtlasRESTTagSource.initialize()"); } + sleepTimeBetweenCycleInMillis = TagSyncConfig.getTagSourceAtlasDownloadIntervalInMillis(properties); + boolean ret = AtlasResourceMapperUtil.initializeAtlasResourceMappers(properties); - sleepTimeBetweenCycleInMillis = TagSyncConfig.getTagSourceAtlasDownloadIntervalInMillis(properties); - final boolean isKerberized = TagSyncConfig.getTagsyncKerberosIdentity(properties) != null; + String sslConfigFile = TagSyncConfig.getAtlasRESTSslConfigFile(properties); + + this.isKerberized = TagSyncConfig.getTagsyncKerberosIdentity(properties) != null; + this.userNamePassword = new String[] { TagSyncConfig.getAtlasRESTUserName(properties), TagSyncConfig.getAtlasRESTPassword(properties) }; - String restUrl = TagSyncConfig.getAtlasRESTEndpoint(properties); - String sslConfigFile = TagSyncConfig.getAtlasRESTSslConfigFile(properties); - String userName = TagSyncConfig.getAtlasRESTUserName(properties); - String password = TagSyncConfig.getAtlasRESTPassword(properties); + String restEndpoint = TagSyncConfig.getAtlasRESTEndpoint(properties); if (LOG.isDebugEnabled()) { - LOG.debug("restUrl=" + restUrl); + LOG.debug("restEndpoint=" + restEndpoint); LOG.debug("sslConfigFile=" + sslConfigFile); - LOG.debug("userName=" + userName); + LOG.debug("userName=" + userNamePassword[0]); LOG.debug("kerberized=" + isKerberized); } - if (StringUtils.isNotEmpty(restUrl)) { - if (!restUrl.endsWith("/")) { - restUrl += "/"; - } - RangerRESTClient atlasRESTClient = new RangerRESTClient(restUrl, sslConfigFile); + if (StringUtils.isNotEmpty(restEndpoint)) { + this.restUrls = restEndpoint.split(","); - if (!isKerberized) { - atlasRESTClient.setBasicAuthInfo(userName, password); + for (int i = 0; i < restUrls.length; i++) { + if (!restUrls[i].endsWith("/")) { + restUrls[i] += "/"; + } } - atlasRESTUtil = new AtlasRESTUtil(atlasRESTClient, isKerberized); + } else { LOG.info("AtlasEndpoint not specified, Initial download of Atlas-entities cannot be done."); ret = false; @@ -133,7 +136,6 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable { @Override public boolean start() { - myThread = new Thread(this); myThread.setDaemon(true); myThread.start(); @@ -150,21 +152,17 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable { @Override public void run() { - if (LOG.isDebugEnabled()) { LOG.debug("==> AtlasRESTTagSource.run()"); } while (true) { - synchUp(); LOG.debug("Sleeping for [" + sleepTimeBetweenCycleInMillis + "] milliSeconds"); try { - Thread.sleep(sleepTimeBetweenCycleInMillis); - } catch (InterruptedException exception) { LOG.error("Interrupted..: ", exception); return; @@ -173,17 +171,40 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable { } public void synchUp() { + SearchParameters searchParams = new SearchParameters(); + AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry(); + AtlasTransientTypeRegistry tty = null; + AtlasSearchResult searchResult = null; + + searchParams.setClassification("*"); + searchParams.setIncludeClassificationAttributes(true); + searchParams.setOffset(0); + searchParams.setLimit(Integer.MAX_VALUE); + + try { + AtlasClientV2 atlasClient = getAtlasClient(); + + searchResult = atlasClient.facetedSearch(searchParams); - List<AtlasEntityWithTraits> atlasEntities = atlasRESTUtil.getAtlasEntities(); + AtlasTypesDef typesDef = atlasClient.getAllTypeDefs(new SearchFilter()); - if (CollectionUtils.isNotEmpty(atlasEntities)) { + tty = typeRegistry.lockTypeRegistryForUpdate(); + + tty.addTypes(typesDef); + } catch (AtlasServiceException | AtlasBaseException | IOException excp) { + LOG.error("failed to download tags from Atlas", excp); + } finally { + if (tty != null) { + typeRegistry.releaseTypeRegistryForUpdate(tty, true); + } + } + + if (searchResult != null) { if (LOG.isDebugEnabled()) { - for (AtlasEntityWithTraits element : atlasEntities) { - LOG.debug(element); - } + LOG.debug(AtlasType.toJson(searchResult)); } - Map<String, ServiceTags> serviceTagsMap = AtlasNotificationMapper.processAtlasEntities(atlasEntities); + Map<String, ServiceTags> serviceTagsMap = AtlasNotificationMapper.processSearchResult(searchResult, typeRegistry); if (MapUtils.isNotEmpty(serviceTagsMap)) { for (Map.Entry<String, ServiceTags> entry : serviceTagsMap.entrySet()) { @@ -195,6 +216,7 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable { LOG.debug("serviceTags=" + serviceTagsString); } + updateSink(entry.getValue()); } } @@ -202,5 +224,20 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable { } + private AtlasClientV2 getAtlasClient() throws IOException { + final AtlasClientV2 ret; + + if (isKerberized) { + UserGroupInformation ugi = UserGroupInformation.getLoginUser(); + + ugi.checkTGTAndReloginFromKeytab(); + + ret = new AtlasClientV2(ugi, ugi.getShortUserName(), restUrls); + } else { + ret = new AtlasClientV2(restUrls, userNamePassword); + } + + return ret; + } } http://git-wip-us.apache.org/repos/asf/ranger/blob/30b1188f/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java deleted file mode 100644 index 00a101e..0000000 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java +++ /dev/null @@ -1,325 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ranger.tagsync.source.atlasrest; - -import com.google.gson.Gson; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; -import org.apache.atlas.typesystem.IReferenceableInstance; -import org.apache.atlas.typesystem.IStruct; -import org.apache.atlas.typesystem.Struct; -import org.apache.atlas.typesystem.json.InstanceSerialization; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.log4j.Logger; -import org.apache.ranger.admin.client.datatype.RESTResponse; -import org.apache.ranger.plugin.util.RangerRESTClient; -import org.apache.ranger.tagsync.source.atlas.AtlasEntityWithTraits; -import org.apache.ranger.tagsync.source.atlas.AtlasResourceMapperUtil; - -import java.io.IOException; -import java.security.PrivilegedAction; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -@SuppressWarnings("unchecked") -public class AtlasRESTUtil { - private static final Logger LOG = Logger.getLogger(AtlasRESTUtil.class); - - private static final String REST_MIME_TYPE_JSON = "application/json"; - private static final String API_ATLAS_TYPES = "api/atlas/types"; - private static final String API_ATLAS_ENTITIES = "api/atlas/entities?type="; - private static final String API_ATLAS_ENTITY = "api/atlas/entities/"; - private static final String API_ATLAS_TYPE = "api/atlas/types/"; - - private static final String RESULTS_ATTRIBUTE = "results"; - private static final String DEFINITION_ATTRIBUTE = "definition"; - private static final String VALUES_ATTRIBUTE = "values"; - private static final String TRAITS_ATTRIBUTE = "traits"; - private static final String TYPE_NAME_ATTRIBUTE = "typeName"; - private static final String TRAIT_TYPES_ATTRIBUTE = "traitTypes"; - private static final String SUPER_TYPES_ATTRIBUTE = "superTypes"; - private static final String ATTRIBUTE_DEFINITIONS_ATTRIBUTE = "attributeDefinitions"; - private static final String NAME_ATTRIBUTE = "name"; - - private final Gson gson = new Gson(); - - private final RangerRESTClient atlasRESTClient; - - private final boolean isKerberized; - - public AtlasRESTUtil(RangerRESTClient atlasRESTClient, boolean isKerberized) { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasRESTUtil()"); - } - - this.atlasRESTClient = atlasRESTClient; - - this.isKerberized = isKerberized; - - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasRESTUtil()"); - } - } - - public List<AtlasEntityWithTraits> getAtlasEntities() { - if (LOG.isDebugEnabled()) { - LOG.debug("==> getAtlasEntities()"); - } - - List<AtlasEntityWithTraits> ret = new ArrayList<AtlasEntityWithTraits>(); - - Map<String, Object> typesResponse = atlasAPI(API_ATLAS_TYPES); - - List<String> types = getAttribute(typesResponse, RESULTS_ATTRIBUTE, List.class); - - if (CollectionUtils.isNotEmpty(types)) { - - for (String type : types) { - - if (!AtlasResourceMapperUtil.isEntityTypeHandled(type)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Not fetching Atlas entities of type: " + type); - } - continue; - } - - Map<String, Object> entitiesResponse = atlasAPI(API_ATLAS_ENTITIES + type); - - List<String> guids = getAttribute(entitiesResponse, RESULTS_ATTRIBUTE, List.class); - - if (CollectionUtils.isEmpty(guids)) { - if (LOG.isDebugEnabled()) { - LOG.debug("No Atlas entities for type: " + type); - } - continue; - } - - for (String guid : guids) { - - Map<String, Object> entityResponse = atlasAPI(API_ATLAS_ENTITY + guid); - - Map<String, Object> definition = getAttribute(entityResponse, DEFINITION_ATTRIBUTE, Map.class); - - Map<String, Object> traitsAttribute = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class); - - List<IStruct> allTraits = new LinkedList<>(); - - if (MapUtils.isNotEmpty(traitsAttribute)) { - - for (Map.Entry<String, Object> entry : traitsAttribute.entrySet()) { - - Map<String, Object> trait = (Map<String, Object>) entry.getValue(); - - Map<String, Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class); - String traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class); - - if (StringUtils.isEmpty(traitTypeName)) { - continue; - } - - List<IStruct> superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues); - - Struct trait1 = new Struct(traitTypeName, traitValues); - - allTraits.add(trait1); - allTraits.addAll(superTypes); - } - } - - IReferenceableInstance entity = InstanceSerialization.fromJsonReferenceable(gson.toJson(definition), true); - - if (entity != null) { - AtlasEntityWithTraits atlasEntity = new AtlasEntityWithTraits(entity, allTraits); - ret.add(atlasEntity); - } else { - if (LOG.isInfoEnabled()) { - LOG.info("Could not create Atlas entity from its definition, type=" + type + ", guid=" + guid); - } - } - - } - - } - if (LOG.isDebugEnabled()) { - LOG.debug("<== getAtlasEntities()"); - } - } - - return ret; - } - - private Map<String, Object> getTraitType(String traitName) { - if (LOG.isDebugEnabled()) { - LOG.debug("==> getTraitType(" + traitName + ")"); - } - Map<String, Object> ret = null; - - Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + traitName); - - Map<String, Object> definition = getAttribute(typeResponse, DEFINITION_ATTRIBUTE, Map.class); - - List traitTypes = getAttribute(definition, TRAIT_TYPES_ATTRIBUTE, List.class); - - if (CollectionUtils.isNotEmpty(traitTypes)) { - ret = (Map<String, Object>) traitTypes.get(0); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== getTraitType(" + traitName + ")"); - } - return ret; - } - - private List<IStruct> getTraitSuperTypes(Map<String, Object> traitType, Map<String, Object> values) { - - if (LOG.isDebugEnabled()) { - LOG.debug("==> getTraitSuperTypes()"); - } - List<IStruct> ret = new LinkedList<>(); - - if (traitType != null) { - - List<String> superTypeNames = getAttribute(traitType, SUPER_TYPES_ATTRIBUTE, List.class); - - if (CollectionUtils.isNotEmpty(superTypeNames)) { - for (String superTypeName : superTypeNames) { - - Map<String, Object> superTraitType = getTraitType(superTypeName); - - if (superTraitType != null) { - List<Map<String, Object>> attributeDefinitions = (List) superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE); - - Map<String, Object> superTypeValues = new HashMap<>(); - for (Map<String, Object> attributeDefinition : attributeDefinitions) { - - String attributeName = attributeDefinition.get(NAME_ATTRIBUTE).toString(); - if (values.containsKey(attributeName)) { - superTypeValues.put(attributeName, values.get(attributeName)); - } - } - - List<IStruct> superTraits = getTraitSuperTypes(getTraitType(superTypeName), values); - - Struct superTrait = new Struct(superTypeName, superTypeValues); - - ret.add(superTrait); - ret.addAll(superTraits); - } - } - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("<== getTraitSuperTypes()"); - } - return ret; - } - - private Map<String, Object> atlasAPI(final String endpoint) { - - if (LOG.isDebugEnabled()) { - LOG.debug("==> atlasAPI(" + endpoint + ")"); - } - Map<String, Object> ret = new HashMap<String, Object>(); - - try { - UserGroupInformation userGroupInformation = null; - if (isKerberized) { - userGroupInformation = UserGroupInformation.getLoginUser(); - - try { - userGroupInformation.checkTGTAndReloginFromKeytab(); - } catch (IOException ioe) { - LOG.error("Error renewing TGT and relogin", ioe); - userGroupInformation = null; - } - } - if (userGroupInformation != null) { - LOG.debug("Using kerberos authentication"); - if(LOG.isDebugEnabled()) { - LOG.debug("Using Principal = "+ userGroupInformation.getUserName()); - } - ret = userGroupInformation.doAs(new PrivilegedAction<Map<String, Object>>() { - @Override - public Map<String, Object> run() { - try{ - return executeAtlasAPI(endpoint); - }catch (Exception e) { - LOG.error("Atlas API failed with message : ", e); - } - return null; - } - }); - } else { - LOG.debug("Using basic authentication"); - ret = executeAtlasAPI(endpoint); - } - } catch (Exception exception) { - LOG.error("Exception when fetching Atlas objects.", exception); - ret = null; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== atlasAPI(" + endpoint + ")"); - } - return ret; - } - - private Map<String, Object> executeAtlasAPI(final String endpoint) { - - if (LOG.isDebugEnabled()) { - LOG.debug("==> executeAtlasAPI(" + endpoint + ")"); - } - - Map<String, Object> ret = new HashMap<String, Object>(); - - try { - final WebResource webResource = atlasRESTClient.getResource(endpoint); - - ClientResponse response = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).get(ClientResponse.class); - - if (response != null && response.getStatus() == 200) { - ret = response.getEntity(ret.getClass()); - } else { - RESTResponse resp = RESTResponse.fromClientResponse(response); - LOG.error("Error getting atlas data request=" + webResource.toString() - + ", response=" + resp.toString()); - } - } catch (Exception exception) { - LOG.error("Exception when fetching Atlas objects.", exception); - ret = null; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== executeAtlasAPI(" + endpoint + ")"); - } - - return ret; - } - - private <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) { - return MapUtils.isNotEmpty(map) ? type.cast(map.get(name)) : null; - } - -}
