Repository: incubator-ranger
Updated Branches:
  refs/heads/master 5b86864a9 -> 49e890e26


RANGER-726: Updated tagsync for recent changes in Atlas API

Signed-off-by: Madhan Neethiraj <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/49e890e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/49e890e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/49e890e2

Branch: refs/heads/master
Commit: 49e890e26360c742ccbf80d2741df7bec48c6319
Parents: 5b86864
Author: Abhay Kulkarni <[email protected]>
Authored: Mon Nov 9 18:50:52 2015 -0800
Committer: Madhan Neethiraj <[email protected]>
Committed: Wed Nov 11 12:29:47 2015 -0800

----------------------------------------------------------------------
 .../atlas-client-0.5.0.2.3.1.1-19.jar           | Bin 34558 -> 0 bytes
 .../atlas-notification-0.5.0.2.3.1.1-19.jar     | Bin 34734 -> 0 bytes
 .../atlas-typesystem-0.5.0.2.3.1.1-19.jar       | Bin 355350 -> 0 bytes
 pom.xml                                         |   4 +-
 src/main/assembly/tagsync.xml                   |   3 +
 .../conf/templates/installprop2xml.properties   |   6 +-
 tagsync/pom.xml                                 |  18 +
 tagsync/scripts/install.properties              |   2 +-
 tagsync/scripts/setup.py                        |   5 +-
 .../source/atlas/AtlasNotificationMapper.java   |  97 +++--
 .../tagsync/source/atlas/AtlasUtility.java      | 404 -------------------
 .../tagsync/source/atlas/TagAtlasSource.java    |  78 ++--
 12 files changed, 132 insertions(+), 485 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/local-repo/org/apache/atlas/atlas-client/0.5.0.2.3.1.1-19/atlas-client-0.5.0.2.3.1.1-19.jar
----------------------------------------------------------------------
diff --git 
a/local-repo/org/apache/atlas/atlas-client/0.5.0.2.3.1.1-19/atlas-client-0.5.0.2.3.1.1-19.jar
 
b/local-repo/org/apache/atlas/atlas-client/0.5.0.2.3.1.1-19/atlas-client-0.5.0.2.3.1.1-19.jar
deleted file mode 100644
index 1fb2ef7..0000000
Binary files 
a/local-repo/org/apache/atlas/atlas-client/0.5.0.2.3.1.1-19/atlas-client-0.5.0.2.3.1.1-19.jar
 and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/local-repo/org/apache/atlas/atlas-notification/0.5.0.2.3.1.1-19/atlas-notification-0.5.0.2.3.1.1-19.jar
----------------------------------------------------------------------
diff --git 
a/local-repo/org/apache/atlas/atlas-notification/0.5.0.2.3.1.1-19/atlas-notification-0.5.0.2.3.1.1-19.jar
 
b/local-repo/org/apache/atlas/atlas-notification/0.5.0.2.3.1.1-19/atlas-notification-0.5.0.2.3.1.1-19.jar
deleted file mode 100644
index 848eeb3..0000000
Binary files 
a/local-repo/org/apache/atlas/atlas-notification/0.5.0.2.3.1.1-19/atlas-notification-0.5.0.2.3.1.1-19.jar
 and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/local-repo/org/apache/atlas/atlas-typesystem/0.5.0.2.3.1.1-19/atlas-typesystem-0.5.0.2.3.1.1-19.jar
----------------------------------------------------------------------
diff --git 
a/local-repo/org/apache/atlas/atlas-typesystem/0.5.0.2.3.1.1-19/atlas-typesystem-0.5.0.2.3.1.1-19.jar
 
b/local-repo/org/apache/atlas/atlas-typesystem/0.5.0.2.3.1.1-19/atlas-typesystem-0.5.0.2.3.1.1-19.jar
deleted file mode 100644
index f619b6e..0000000
Binary files 
a/local-repo/org/apache/atlas/atlas-typesystem/0.5.0.2.3.1.1-19/atlas-typesystem-0.5.0.2.3.1.1-19.jar
 and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0648d67..d60fca4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -213,7 +213,7 @@
                <jaxb-api.version>2.2.2</jaxb-api.version>
                <jackson.version>1.9.13</jackson.version>
                <sun-jersey-bundle.version>1.19</sun-jersey-bundle.version>
-               <atlas.version>0.5.0.2.3.1.1-19</atlas.version>
+               <atlas.version>0.6-incubating-SNAPSHOT</atlas.version>
                <distMgmtStagingId>apache.staging.https</distMgmtStagingId>
        <distMgmtStagingName>Apache Release Distribution 
Repository</distMgmtStagingName>
        
<distMgmtStagingUrl>https://repository.apache.org/service/local/staging/deploy/maven2</distMgmtStagingUrl>
@@ -264,6 +264,7 @@
                 <enabled>false</enabled>
             </snapshots>
           </repository>
+    <!--
     <repository>
       <id>repo</id>
       <url>file://${basedir}/local-repo</url>
@@ -271,6 +272,7 @@
          <enabled>true</enabled>
       </snapshots>
   </repository>
+  -->
   </repositories>
        <dependencyManagement>
                <dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/src/main/assembly/tagsync.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/tagsync.xml b/src/main/assembly/tagsync.xml
index 331dae0..8adc5cc 100644
--- a/src/main/assembly/tagsync.xml
+++ b/src/main/assembly/tagsync.xml
@@ -56,7 +56,10 @@
                                        
<include>org.apache.atlas:atlas-notification</include>
                                        
<include>org.apache.atlas:atlas-typesystem</include>
                                        
<include>org.apache.atlas:atlas-client</include>
+                                       
<include>org.apache.atlas:atlas-common</include>
                                        
<include>com.google.inject:guice</include>
+                                       
<include>com.google.inject.extensions:guice-multibindings</include>
+                                       
<include>org.codehaus.jettison:jettison</include>
                                        
<include>aopalliance:aopalliance</include>
                                        
<include>javax.inject:javax.inject</include>
                                        
<include>org.apache.kafka:kafka_2.10</include>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/tagsync/conf/templates/installprop2xml.properties
----------------------------------------------------------------------
diff --git a/tagsync/conf/templates/installprop2xml.properties 
b/tagsync/conf/templates/installprop2xml.properties
index 5b63835..101a1ba 100644
--- a/tagsync/conf/templates/installprop2xml.properties
+++ b/tagsync/conf/templates/installprop2xml.properties
@@ -34,8 +34,8 @@ TAG_SOURCE_FILE_MOD_TIME_CHECK_INTERVAL = 
ranger.tagsync.filesource.modtime.chec
 
 TAGSYNC_FILESOURCE_FILENAME = ranger.tagsync.filesource.filename
 
-TAGSYNC_ATLAS_KAFKA_ENDPOINTS = atlas.notification.kafka.bootstrap.servers
-TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = atlas.notification.kafka.zookeeper.connect
-TAGSYNC_ATLAS_CONSUMER_GROUP = atlas.notification.kafka.group.id
+TAGSYNC_ATLAS_KAFKA_ENDPOINTS = atlas.kafka.bootstrap.servers
+TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = atlas.kafka.zookeeper.connect
+TAGSYNC_ATLAS_CONSUMER_GROUP = atlas.kafka.entities.group.id
 
 TAGSYNC_ATLAS_TO_RANGER_SERVICE_MAPPING = 
ranger.tagsync.atlas.to.service.mapping

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/tagsync/pom.xml
----------------------------------------------------------------------
diff --git a/tagsync/pom.xml b/tagsync/pom.xml
index b800f61..c860c4a 100644
--- a/tagsync/pom.xml
+++ b/tagsync/pom.xml
@@ -30,8 +30,10 @@
         <version>0.5.0</version>
     </parent>
 
+    <!--
     <repositories>
 
+
         <repository>
             <id>repo</id>
             <url>file://${basedir}/../local-repo</url>
@@ -40,6 +42,7 @@
             </snapshots>
         </repository>
     </repositories>
+    -->
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -107,6 +110,16 @@
             <version>4.0</version>
         </dependency>
         <dependency>
+            <groupId>com.google.inject.extensions</groupId>
+            <artifactId>guice-multibindings</artifactId>
+            <version>4.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.jettison</groupId>
+            <artifactId>jettison</artifactId>
+            <version>1.1</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.atlas</groupId>
             <artifactId>atlas-notification</artifactId>
             <version>${atlas.version}</version>
@@ -121,5 +134,10 @@
             <artifactId>atlas-client</artifactId>
             <version>${atlas.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-common</artifactId>
+            <version>${atlas.version}</version>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/tagsync/scripts/install.properties
----------------------------------------------------------------------
diff --git a/tagsync/scripts/install.properties 
b/tagsync/scripts/install.properties
index f7de6e3..b5ad580 100644
--- a/tagsync/scripts/install.properties
+++ b/tagsync/scripts/install.properties
@@ -53,7 +53,7 @@ TAG_SOURCE_FILE_MOD_TIME_CHECK_INTERVAL = 60000
 
 TAGSYNC_ATLAS_KAFKA_ENDPOINTS = localhost:6667
 TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = localhost:2181
-TAGSYNC_ATLAS_CONSUMER_GROUP = entityConsumer
+TAGSYNC_ATLAS_CONSUMER_GROUP = ranger_entities_consumer
 
 # Mapping from Atlas hive instance-name to Ranger service-name
 # this needs to be in format 
clusterName,componentType,serviceName;clusterName2,componentType2,serviceName2

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/tagsync/scripts/setup.py
----------------------------------------------------------------------
diff --git a/tagsync/scripts/setup.py b/tagsync/scripts/setup.py
index e4b2433..f7455b8 100755
--- a/tagsync/scripts/setup.py
+++ b/tagsync/scripts/setup.py
@@ -317,8 +317,9 @@ def main():
        atlasOutFile = file(atlasOutFn, "a")
 
        atlasOutFile.write("atlas.notification.embedded=false" + "\n")
-       atlasOutFile.write("atlas.notification.kafka.acks=1" + "\n")
-       
atlasOutFile.write("atlas.notification.kafka.data=${sys:atlas.home}/data/kafka" 
+ "\n")
+       atlasOutFile.write("atlas.kafka.acks=1" + "\n")
+       atlasOutFile.write("atlas.kafka.data=${sys:atlas.home}/data/kafka" + 
"\n")
+       atlasOutFile.write("atlas.kafka.hook.group.id=atlas" + "\n")
 
        atlasOutFile.close()
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index 8046b68..7925b5c 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -19,9 +19,10 @@
 
 package org.apache.ranger.tagsync.source.atlas;
 
+import org.apache.atlas.AtlasException;
 import org.apache.atlas.notification.entity.EntityNotification;
-import org.apache.atlas.typesystem.api.Entity;
-import org.apache.atlas.typesystem.api.Trait;
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.atlas.typesystem.IStruct;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -58,7 +59,9 @@ class AtlasNotificationMapper {
                properties = props;
 
                try {
-                       if (isEntityMappable(entityNotification.getEntity())) {
+                       IReferenceableInstance entity = 
entityNotification.getEntity();
+
+                       if (isEntityMappable(entity)) {
                                ret = createServiceTags(entityNotification);
                        } else {
                                if(LOG.isDebugEnabled()) {
@@ -71,7 +74,7 @@ class AtlasNotificationMapper {
                return ret;
        }
 
-       static private boolean isEntityMappable(Entity entity) {
+       static private boolean isEntityMappable(IReferenceableInstance entity) {
                boolean ret = false;
 
                String entityTypeName = entity.getTypeName();
@@ -91,44 +94,43 @@ class AtlasNotificationMapper {
                ServiceTags ret = null;
 
                EntityNotification.OperationType opType = 
entityNotification.getOperationType();
-               Entity entity = entityNotification.getEntity();
 
-               String opName = entityNotification.getOperationType().name();
                switch (opType) {
-                       case ENTITY_CREATED: {
-                               LOG.debug("ENTITY_CREATED notification is not 
handled, as Ranger will get necessary information from any subsequent 
TRAIT_ADDED notification");
+                       case ENTITY_CREATE: {
+                               LOG.debug("ENTITY_CREATE notification is not 
handled, as Ranger will get necessary information from any subsequent 
TRAIT_ADDED notification");
                                break;
                        }
-                       case ENTITY_UPDATED: {
-                               ret = getServiceTags(entity);
+                       case ENTITY_UPDATE: {
+                               ret = getServiceTags(entityNotification);
                                if (MapUtils.isEmpty(ret.getTags())) {
                                        LOG.debug("No traits associated with 
this entity update notification. Ignoring it altogether");
                                        ret = null;
                                }
                                break;
                        }
-                       case TRAIT_ADDED:
-                       case TRAIT_DELETED: {
-                               ret = getServiceTags(entity);
+                       case TRAIT_ADD:
+                       case TRAIT_DELETE: {
+                               ret = getServiceTags(entityNotification);
                                break;
                        }
                        default:
-                               LOG.error(opName + ": unknown notification 
received - not handled");
+                               LOG.error(opType + ": unknown notification 
received - not handled");
                }
 
                return ret;
        }
 
-       static private ServiceTags getServiceTags(Entity entity) throws 
Exception {
+       static private ServiceTags getServiceTags(EntityNotification 
entityNotification) throws Exception {
                ServiceTags ret = null;
 
+               IReferenceableInstance entity = entityNotification.getEntity();
 
                List<RangerServiceResource> serviceResources = new 
ArrayList<RangerServiceResource>();
 
                RangerServiceResource serviceResource = 
getServiceResource(entity);
                serviceResources.add(serviceResource);
 
-               Map<Long, RangerTag> tags = getTags(entity);
+               Map<Long, RangerTag> tags = getTags(entityNotification);
 
                Map<Long, RangerTagDef> tagDefs = getTagDefs(tags);
 
@@ -163,7 +165,7 @@ class AtlasNotificationMapper {
        }
 
 
-       static private RangerServiceResource getServiceResource(Entity entity) 
throws Exception {
+       static private RangerServiceResource 
getServiceResource(IReferenceableInstance entity) throws Exception {
 
                RangerServiceResource ret = null;
 
@@ -224,7 +226,7 @@ class AtlasNotificationMapper {
 
 
                ret = new RangerServiceResource();
-               ret.setGuid(entity.getId().getGuid());
+               ret.setGuid(entity.getId()._getId());
                ret.setId(1L);
                ret.setServiceName(serviceName);
                ret.setResourceElements(elements);
@@ -232,22 +234,24 @@ class AtlasNotificationMapper {
                return ret;
        }
 
-       static private Map<Long, RangerTag> getTags(Entity entity) {
+       static private Map<Long, RangerTag> getTags(EntityNotification 
entityNotification) {
                Map<Long, RangerTag> ret = null;
 
-               Map<String, ? extends Trait> traits = entity.getTraits();
+               ret = new HashMap<Long, RangerTag>();
+
+               long index = 1;
+
+               List<IStruct> traits = entityNotification.getAllTraits();
+
+               for (IStruct trait : traits) {
 
-               if (MapUtils.isNotEmpty(traits)) {
-                       ret = new HashMap<Long, RangerTag>();
-                       long index = 1;
+                       String traitName = trait.getTypeName();
 
-                       for (Map.Entry<String, ? extends Trait> entry : 
traits.entrySet()) {
-                               String traitName = entry.getKey();
-                               Trait trait = entry.getValue();
+                       Map<String, String> tagAttrValues = new HashMap<String, 
String>();
 
-                               Map<String, Object> attrValues = 
trait.getValues();
+                       try {
 
-                               Map<String, String> tagAttrValues = new 
HashMap<String, String>();
+                               Map<String, Object> attrValues = 
trait.getValuesMap();
 
                                for (Map.Entry<String, Object> attrValueEntry : 
attrValues.entrySet()) {
                                        String attrName = 
attrValueEntry.getKey();
@@ -259,14 +263,17 @@ class AtlasNotificationMapper {
                                                LOG.error("Cannot cast 
attribute-value to String, skipping... attrName=" + attrName);
                                        }
                                }
+                       } catch (AtlasException exception) {
+                               LOG.error("Could not get values for trait:" + 
traitName, exception);
+                       }
 
-                               RangerTag tag = new RangerTag();
+                       RangerTag tag = new RangerTag();
 
-                               tag.setType(traitName);
-                               tag.setAttributes(tagAttrValues);
+                       tag.setType(traitName);
+                       tag.setAttributes(tagAttrValues);
+
+                       ret.put(index++, tag);
 
-                               ret.put(index++, tag);
-                       }
                }
 
                return ret;
@@ -289,13 +296,13 @@ class AtlasNotificationMapper {
                return ret;
        }
 
-       static private String[] getQualifiedNameComponents(Entity entity) {
+       static private String[] 
getQualifiedNameComponents(IReferenceableInstance entity) {
                String ret[] = new String[5];
 
                if (StringUtils.equals(entity.getTypeName(), 
ENTITY_TYPE_HIVE_DB)) {
 
-                       String clusterName = getAttribute(entity.getValues(), 
"clusterName", String.class);
-                       String name = getAttribute(entity.getValues(), "name", 
String.class);
+                       String clusterName = getEntityAttribute(entity, 
"clusterName", String.class);
+                       String name = getEntityAttribute(entity, "name", 
String.class);
 
                        ret[1] = clusterName;
                        ret[2] = name;
@@ -303,20 +310,20 @@ class AtlasNotificationMapper {
                        ret[0] = ret[1] + "." + ret[2];
 
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("----- Entity-Id:" + 
entity.getId().getGuid());
+                               LOG.debug("----- Entity-Id:" + 
entity.getId()._getId());
                                LOG.debug("----- Entity-Type-Name:" + 
entity.getTypeName());
                                LOG.debug("----- Entity-Cluster-Name:" + 
clusterName);
                                LOG.debug("----- Entity-Name:" + name);
                        }
                } else {
-                       String qualifiedName = getAttribute(entity.getValues(), 
ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+                       String qualifiedName = getEntityAttribute(entity, 
ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
 
                        String nameHierarchy[] = 
qualifiedName.split(QUALIFIED_NAME_FORMAT_DELIMITER_STRING);
 
                        int hierarchyLevels = nameHierarchy.length;
 
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("----- Entity-Id:" + 
entity.getId().getGuid());
+                               LOG.debug("----- Entity-Id:" + 
entity.getId()._getId());
                                LOG.debug("----- Entity-Type-Name:" + 
entity.getTypeName());
                                LOG.debug("----- Entity-Qualified-Name:" + 
qualifiedName);
                                LOG.debug("-----        
Entity-Qualified-Name-Components -----");
@@ -351,6 +358,18 @@ class AtlasNotificationMapper {
                return TagSyncConfig.getServiceName(apacheComponent, 
instanceName, properties);
        }
 
+       static private <T> T getEntityAttribute(IReferenceableInstance entity, 
String name, Class<T> type) {
+               T ret = null;
+
+               try {
+                       Map<String, Object> valueMap = entity.getValuesMap();
+                       ret = getAttribute(valueMap, name, type);
+               } catch (AtlasException exception) {
+                       LOG.error("Cannot get map of values for entity: " + 
entity.getId()._getId(), exception);
+               }
+
+               return ret;
+       }
        static private <T> T getAttribute(Map<String, Object> map, String name, 
Class<T> type) {
                return type.cast(map.get(name));
        }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java
deleted file mode 100644
index 2548c36..0000000
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.tagsync.source.atlas;
-
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-import org.apache.atlas.typesystem.EntityImpl;
-import org.apache.atlas.typesystem.IdImpl;
-import org.apache.atlas.typesystem.TraitImpl;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.atlas.typesystem.api.Entity;
-import org.apache.atlas.typesystem.api.Trait;
-import org.apache.ranger.admin.client.datatype.RESTResponse;
-import org.apache.ranger.plugin.util.RangerRESTClient;
-import org.apache.ranger.plugin.util.RangerRESTUtils;
-import org.apache.ranger.tagsync.process.TagSyncConfig;
-
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.util.*;
-
-
-// class AtlasUtil
-
-@SuppressWarnings("unchecked")
-public class AtlasUtility {
-
-       private static final Log LOG = LogFactory.getLog(AtlasUtility.class);
-
-       // Atlas APIs
-
-       public static final String API_ATLAS_TYPES = "api/atlas/types";
-       public static final String API_ATLAS_ENTITIES = 
"api/atlas/entities?type=";
-       public static final String API_ATLAS_ENTITY = "api/atlas/entities/";
-       public static final String API_ATLAS_TYPE = "api/atlas/types/";
-
-       public static final String RESULTS_ATTRIBUTE = "results";
-       public static final String DEFINITION_ATTRIBUTE = "definition";
-       public static final String VALUES_ATTRIBUTE = "values";
-       public static final String TRAITS_ATTRIBUTE = "traits";
-       public static final String TYPE_NAME_ATTRIBUTE = "typeName";
-       public static final String TRAIT_TYPES_ATTRIBUTE = "traitTypes";
-       public static final String SUPER_TYPES_ATTRIBUTE = "superTypes";
-       public static final String ATTRIBUTE_DEFINITIONS_ATTRIBUTE = 
"attributeDefinitions";
-       public static final String NAME_ATTRIBUTE = "name";
-
-       private Type mapType = new TypeToken<Map<String, Object>>() {
-       }.getType();
-
-       private RangerRESTClient restClient;
-       private Map<String, Entity> entities = new LinkedHashMap<>();
-
-
-       // ----- Constructor 
------------------------------------------------------
-
-       public AtlasUtility(Properties properties) {
-
-               String url = TagSyncConfig.getAtlasEndpoint(properties);
-               String sslConfigFileName = 
TagSyncConfig.getAtlasSslConfigFileName(properties);
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Initializing RangerRestClient with (url=" + 
url + ", sslConfigFileName" + sslConfigFileName + ")");
-               }
-
-               restClient = new RangerRESTClient(url, sslConfigFileName);
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Initialized RangerRestClient with (url=" + 
url + ", sslConfigFileName=" + sslConfigFileName + ")");
-               }
-       }
-
-       // update the set of entities with current from Atlas
-       public void refreshAllEntities() {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagAtlasSource.refreshAllEntities()");
-               }
-
-               try {
-                       entities.clear();
-                       entities.putAll(getAllEntities());
-               } catch (IOException e) {
-                       LOG.error("getAllEntities() failed", e);
-               }
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagAtlasSource.refreshAllEntities()");
-               }
-       }
-
-       // ----- AtlasUtility 
------------------------------------------------------
-
-       public Map<String, Entity> getAllEntities() throws IOException {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagAtlasSource.getAllEntities()");
-               }
-               Map<String, Entity> entities = new LinkedHashMap<>();
-
-               Map<String, Object> typesResponse = atlasAPI(API_ATLAS_TYPES);
-
-               List<String> types = getAttribute(typesResponse, 
RESULTS_ATTRIBUTE, List.class);
-
-               for (String type : types) {
-
-                       Map<String, Object> entitiesResponse = 
atlasAPI(API_ATLAS_ENTITIES + type);
-
-                       List<String> guids = getAttribute(entitiesResponse, 
RESULTS_ATTRIBUTE, List.class);
-
-                       for (String guid : guids) {
-
-                               if (StringUtils.isNotBlank(guid)) {
-
-                                       Map<Trait, Map<String, ? extends 
Trait>> traitSuperTypes = new HashMap<>();
-
-                                       Map<String, Object> entityResponse = 
atlasAPI(API_ATLAS_ENTITY + guid);
-
-                                       if 
(entityResponse.containsKey(DEFINITION_ATTRIBUTE)) {
-                                               String definitionJSON = 
getAttribute(entityResponse, DEFINITION_ATTRIBUTE, String.class);
-
-                                               LOG.info("{");
-                                               LOG.info("      \"entity-id\":" 
+ guid + ",");
-                                               LOG.info("      
\"entity-definition\":" + definitionJSON);
-                                               LOG.info("}");
-
-                                               Map<String, Object> definition 
= new Gson().fromJson(definitionJSON, mapType);
-
-                                               Map<String, Object> values = 
getAttribute(definition, VALUES_ATTRIBUTE, Map.class);
-                                               Map<String, Object> traits = 
getAttribute(definition, TRAITS_ATTRIBUTE, Map.class);
-                                               String typeName = 
getAttribute(definition, TYPE_NAME_ATTRIBUTE, String.class);
-
-                                               LOG.info("Received 
entity(typeName=" + typeName + ", id=" + guid + ")");
-
-
-                                               Map<String, TraitImpl> traitMap 
= new HashMap<>();
-
-                                               if 
(MapUtils.isNotEmpty(traits)) {
-
-                                                       LOG.info("Traits for 
entity(typeName=" + typeName + ", id=" + guid + ") ------ ");
-
-                                                       for (Map.Entry<String, 
Object> entry : traits.entrySet()) {
-
-                                                               Map<String, 
Object> trait = (Map<String, Object>) entry.getValue();
-
-                                                               Map<String, 
Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class);
-                                                               String 
traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class);
-
-                                                               Map<String, 
TraitImpl> superTypes = getTraitSuperTypes(getTraitType(traitTypeName), 
traitValues);
-
-                                                               TraitImpl 
trait1 = new TraitImpl(traitTypeName, traitValues, superTypes);
-
-                                                               
traitSuperTypes.put(trait1, superTypes);
-
-                                                               
traitMap.put(entry.getKey(), trait1);
-
-
-                                                               LOG.info("      
                Trait(typeName=" + traitTypeName + ")");
-
-                                                       }
-                                               } else {
-                                                       LOG.info("No traits for 
entity(typeName=" + typeName + ", id=" + guid + ")");
-                                               }
-                                               EntityImpl entity = new 
EntityImpl(new IdImpl(guid, 0), typeName, values, traitMap);
-
-                                               showEntity(entity);
-
-                                               entities.put(guid, entity);
-
-                                       }
-                               }
-                       }
-               }
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagAtlasSource.getAllEntities()");
-               }
-               return entities;
-       }
-
-
-       // ----- helper methods 
----------------------------------------------------
-
-       private Map<String, Object> getTraitType(String traitName)
-                       throws IOException {
-
-               Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + 
traitName);
-
-               if (typeResponse.containsKey(DEFINITION_ATTRIBUTE)) {
-                       String definitionJSON = getAttribute(typeResponse, 
DEFINITION_ATTRIBUTE, String.class);
-
-                       Map<String, Object> definition = new 
Gson().fromJson(definitionJSON, mapType);
-
-                       List traitTypes = getAttribute(definition, 
TRAIT_TYPES_ATTRIBUTE, List.class);
-
-                       if (traitTypes.size() > 0) {
-                               return (Map<String, Object>) traitTypes.get(0);
-                       }
-               }
-               return null;
-       }
-
-       private Map<String, TraitImpl> getTraitSuperTypes(Map<String, Object> 
traitType, Map<String, Object> values)
-                       throws IOException {
-
-               Map<String, TraitImpl> superTypes = new HashMap<>();
-
-               if (traitType != null) {
-
-                       List<String> superTypeNames = getAttribute(traitType, 
SUPER_TYPES_ATTRIBUTE, List.class);
-
-                       for (String superTypeName : superTypeNames) {
-
-                               Map<String, Object> superTraitType = 
getTraitType(superTypeName);
-
-                               if (superTraitType != null) {
-                                       List<Map<String, Object>> 
attributeDefinitions = (List) 
superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE);
-
-                                       Map<String, Object> superTypeValues = 
new HashMap<>();
-                                       for (Map<String, Object> 
attributeDefinition : attributeDefinitions) {
-
-                                               String attributeName = 
attributeDefinition.get(NAME_ATTRIBUTE).toString();
-                                               if 
(values.containsKey(attributeName)) {
-                                                       
superTypeValues.put(attributeName, values.get(attributeName));
-                                               }
-                                       }
-
-                                       superTypes.put(superTypeName,
-                                                       //new 
TraitImpl(getTraitSuperTypes(superTraitType, superTypeValues), superTypeValues, 
superTypeName));
-                                                       new 
TraitImpl(superTypeName, superTypeValues, getTraitSuperTypes(superTraitType, 
superTypeValues)));
-                               }
-                       }
-               }
-               return superTypes;
-       }
-
-
-       /*
-               private Map<String, Object> atlasAPI(String endpoint) throws 
IOException {
-                       InputStream in = streamProvider.readFrom(atlasEndpoint 
+ endpoint, "GET", (String) null, Collections.<String, String>emptyMap());
-                       return new Gson().fromJson(IOUtils.toString(in, 
"UTF-8"), mapType);
-               }
-               */
-
-
-       private Map<String, Object> atlasAPI(String endpoint) {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> TagAtlasSource.atlasAPI(" + endpoint + 
")");
-               }
-               // Create a REST client and perform a get on it
-               Map<String, Object> ret = new HashMap<String, Object>();
-
-               WebResource webResource = restClient.getResource(endpoint);
-
-               ClientResponse response = 
webResource.accept(RangerRESTUtils.REST_MIME_TYPE_JSON).get(ClientResponse.class);
-
-               if (response != null && response.getStatus() == 200) {
-                       ret = response.getEntity(ret.getClass());
-               } else {
-                       LOG.error("Atlas REST call returned with response={" + 
response + "}");
-
-                       RESTResponse resp = 
RESTResponse.fromClientResponse(response);
-                       LOG.error("Error getting Atlas Entity. request=" + 
webResource.toString()
-                                       + ", response=" + resp.toString());
-               }
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== TagAtlasSource.atlasAPI(" + endpoint + 
")");
-               }
-               return ret;
-       }
-
-       private <T> T getAttribute(Map<String, Object> map, String name, 
Class<T> type) {
-               return type.cast(map.get(name));
-       }
-
-       public void showEntity(Entity entity) {
-
-               LOG.debug("Entity-id    :" + entity.getId());
-
-               LOG.debug("Type:                " + entity.getTypeName());
-
-               LOG.debug("----- Values -----");
-
-               for (Map.Entry<String, Object> entry : 
entity.getValues().entrySet()) {
-                       LOG.debug("             Name:   " + entry.getKey() + 
"");
-                       Object value = entry.getValue();
-                       LOG.debug("             Value:  " + getValue(value, 
entities.keySet()));
-               }
-
-               LOG.debug("----- Traits -----");
-
-               for (String traitName : entity.getTraits().keySet()) {
-                       LOG.debug("             Name:" + entity.getId() + ", 
trait=" + traitName + ">" + traitName);
-               }
-
-       }
-
-       public void showTrait(Entity entity, String traitId) {
-
-               String[] traitNames = traitId.split(",");
-
-               Trait trait = entity.getTraits().get(traitNames[0]);
-
-               for (int i = 1; i < traitNames.length; ++i) {
-                       trait = trait.getSuperTypes().get(traitNames[i]);
-               }
-
-               String typeName = trait.getTypeName();
-
-               LOG.debug("Trait " + typeName + " for Entity id=" + 
entity.getId());
-
-               LOG.debug("Type: " + typeName);
-
-               LOG.debug("----- Values ------");
-
-               for (Map.Entry<String, Object> entry : 
trait.getValues().entrySet()) {
-                       LOG.debug("Name:" + entry.getKey());
-                       Object value = entry.getValue();
-                       LOG.debug("Value:" + getValue(value, 
entities.keySet()));
-               }
-
-               LOG.debug("Super Traits");
-
-
-               for (String traitName : trait.getSuperTypes().keySet()) {
-                       LOG.debug("Name=" + entity.getId() + "&trait=" + 
traitId + "," + traitName + ">" + traitName);
-               }
-       }
-
-       // resolve the given value if necessary
-       private String getValue(Object value, Set<String> ids) {
-               if (value == null) {
-                       return "";
-               }
-               String idString = getIdValue(value, ids);
-               if (idString != null) {
-                       return idString;
-               }
-
-               idString = getIdListValue(value, ids);
-               if (idString != null) {
-                       return idString;
-               }
-
-               return value.toString();
-       }
-
-       // get an id from the given value; return null if the value is not an 
id type
-       private String getIdValue(Object value, Set<String> ids) {
-               if (value instanceof Map) {
-                       Map map = (Map) value;
-                       if (map.size() == 3 && map.containsKey("id")) {
-                               String id = map.get("id").toString();
-                               if (ids.contains(id)) {
-                                       return id;
-                               }
-                       }
-               }
-               return null;
-       }
-
-       // get an id list from the given value; return null if the value is not 
an id list type
-       private String getIdListValue(Object value, Set<String> ids) {
-               if (value instanceof List) {
-                       List list = (List) value;
-                       if (list.size() > 0) {
-                               StringBuilder sb = new StringBuilder();
-                               for (Object o : list) {
-                                       String idString = getIdValue(o, ids);
-                                       if (idString == null) {
-                                               return value.toString();
-                                       }
-                                       if (sb.length() > 0) {
-                                               sb.append(", ");
-                                       }
-                                       sb.append(idString);
-                               }
-                               return sb.toString();
-                       }
-               }
-               return null;
-       }
-}
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
index 2725b23..fd64d12 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
@@ -23,17 +23,21 @@ import com.google.gson.Gson;
 
 import com.google.inject.Guice;
 import com.google.inject.Injector;
+import com.google.inject.Provider;
 
+import org.apache.atlas.AtlasException;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.notification.NotificationModule;
 import org.apache.atlas.notification.entity.EntityNotification;
-import org.apache.atlas.notification.entity.EntityNotificationConsumer;
-import org.apache.atlas.notification.entity.EntityNotificationConsumerProvider;
-import org.apache.atlas.typesystem.api.Entity;
-import org.apache.atlas.typesystem.api.Trait;
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.atlas.typesystem.IStruct;
+
 import org.apache.ranger.tagsync.model.TagSink;
 import org.apache.ranger.tagsync.model.TagSource;
 import org.apache.ranger.plugin.util.ServiceTags;
@@ -47,9 +51,9 @@ public class TagAtlasSource implements TagSource {
 
        public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = 
"application.properties";
 
-       public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = 
"atlas.notification.kafka.bootstrap.servers";
-       public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = 
"atlas.notification.kafka.zookeeper.connect";
-       public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = 
"atlas.notification.kafka.group.id";
+       public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = 
"atlas.kafka.bootstrap.servers";
+       public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = 
"atlas.kafka.zookeeper.connect";
+       public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = 
"atlas.kafka.entities.group.id";
 
        private TagSink tagSink;
        private Properties properties;
@@ -112,9 +116,11 @@ public class TagAtlasSource implements TagSource {
 
                        Injector injector = 
Guice.createInjector(notificationModule);
 
-                       EntityNotificationConsumerProvider consumerProvider = 
injector.getInstance(EntityNotificationConsumerProvider.class);
+                       Provider<NotificationInterface> consumerProvider = 
injector.getProvider(NotificationInterface.class);
+                       NotificationInterface notification = 
consumerProvider.get();
+                       List<NotificationConsumer<EntityNotification>> 
iterators = 
notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 
1);
 
-                       consumerTask = new 
ConsumerRunnable(consumerProvider.get());
+                       consumerTask = new ConsumerRunnable(iterators.get(0));
                }
 
                if (LOG.isDebugEnabled()) {
@@ -158,26 +164,25 @@ public class TagAtlasSource implements TagSource {
 
        private class ConsumerRunnable implements Runnable {
 
-               private final EntityNotificationConsumer consumer;
+               private final Iterator<EntityNotification> consumerIterator;
 
-               private ConsumerRunnable(EntityNotificationConsumer consumer) {
-                       this.consumer = consumer;
+               private ConsumerRunnable(Iterator<EntityNotification> 
consumerIterator) {
+                       this.consumerIterator = consumerIterator;
                }
 
-
                // ----- Runnable 
--------------------------------------------------------
 
                @Override
                public void run() {
-                       while (consumer.hasNext()) {
+                       while (consumerIterator.hasNext()) {
                                try {
-                                       EntityNotification notification = 
consumer.next();
+                                       EntityNotification notification = 
consumerIterator.next();
                                        if (notification != null) {
                                                printNotification(notification);
                                                ServiceTags serviceTags = 
AtlasNotificationMapper.processEntityNotification(notification, properties);
                                                if (serviceTags == null) {
                                                        
if(LOG.isDebugEnabled()) {
-                                                               LOG.debug("Did 
not create ServiceTags structure for notification type:" + 
notification.getOperationType().name());
+                                                               LOG.debug("Did 
not create ServiceTags structure for notification type:" + 
notification.getOperationType());
                                                        }
                                                } else {
                                                        if 
(LOG.isDebugEnabled()) {
@@ -199,33 +204,36 @@ public class TagAtlasSource implements TagSource {
                }
 
                public void printNotification(EntityNotification notification) {
-                       Entity entity = notification.getEntity();
+                       IReferenceableInstance entity = 
notification.getEntity();
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("Notification-Type: " + 
notification.getOperationType().name());
-                               LOG.debug("Entity-Id: " + 
entity.getId().getGuid());
-                               LOG.debug("Entity-Type: " + 
entity.getTypeName());
+                               try {
+                                       LOG.debug("Notification-Type: " + 
notification.getOperationType());
+                                       LOG.debug("Entity-Id: " + 
entity.getId()._getId());
+                                       LOG.debug("Entity-Type: " + 
entity.getTypeName());
 
-                               LOG.debug("----------- Entity Values 
----------");
+                                       LOG.debug("----------- Entity Values 
----------");
 
 
-                               for (Map.Entry<String, Object> entry : 
entity.getValues().entrySet()) {
-                                       LOG.debug("             Name:" + 
entry.getKey());
-                                       Object value = entry.getValue();
-                                       LOG.debug("             Value:" + 
value);
-                               }
+                                       for (Map.Entry<String, Object> entry : 
entity.getValuesMap().entrySet()) {
+                                               LOG.debug("             Name:" 
+ entry.getKey());
+                                               Object value = entry.getValue();
+                                               LOG.debug("             Value:" 
+ value);
+                                       }
 
-                               LOG.debug("----------- Entity Traits 
----------");
+                                       LOG.debug("----------- Entity Traits 
----------");
 
+                                       List<IStruct> traits = 
notification.getAllTraits();
 
-                               for (Map.Entry<String, ? extends Trait> entry : 
entity.getTraits().entrySet()) {
-                                       LOG.debug("                     
Trait-Name:" + entry.getKey());
-                                       Trait trait = entry.getValue();
-                                       LOG.debug("                     
Trait-Type:" + trait.getTypeName());
-                                       Map<String, Object> traitValues = 
trait.getValues();
-                                       for (Map.Entry<String, Object> 
valueEntry : traitValues.entrySet()) {
-                                               LOG.debug("                     
        Trait-Value-Name:" + valueEntry.getKey());
-                                               LOG.debug("                     
        Trait-Value:" + valueEntry.getValue());
+                                       for (IStruct trait : traits) {
+                                               LOG.debug("                     
Trait-Type-Name:" + trait.getTypeName());
+                                               Map<String, Object> traitValues 
= trait.getValuesMap();
+                                               for (Map.Entry<String, Object> 
valueEntry : traitValues.entrySet()) {
+                                                       LOG.debug("             
                Trait-Value-Name:" + valueEntry.getKey());
+                                                       LOG.debug("             
                Trait-Value:" + valueEntry.getValue());
+                                               }
                                        }
+                               } catch (AtlasException exception) {
+                                       LOG.error("Cannot print notification - 
", exception);
                                }
                        }
                }


Reply via email to