Repository: atlas
Updated Branches:
  refs/heads/master 9c58d30c7 -> a3374c747


http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
index b05a9a3..d01fb9f 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
@@ -30,15 +30,18 @@ import org.apache.atlas.model.instance.AtlasRelatedObjectId;
 import org.apache.atlas.model.instance.AtlasRelationship;
 import org.apache.atlas.model.instance.AtlasStruct;
 import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
 import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
 import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.RepositoryException;
 import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
 import org.apache.atlas.repository.graphdb.AtlasElement;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.type.AtlasArrayType;
+import org.apache.atlas.type.AtlasClassificationType;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasMapType;
 import org.apache.atlas.type.AtlasRelationshipType;
@@ -58,13 +61,32 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.*;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
+import static 
org.apache.atlas.repository.Constants.CLASSIFICATION_PROPAGATE_KEY;
+import static 
org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
 import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX;
+import static org.apache.atlas.repository.graph.GraphHelper.edgeExists;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getAdjacentEdgesByLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getAllTraitNames;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getAssociatedEntityVertex;
+import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getIncomingEdgesByLabel;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getPropagatedEdgeLabel;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getPropagatedTraitNames;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getRelationshipGuid;
+import static org.apache.atlas.repository.graph.GraphHelper.getTraitLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames;
+import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
+import static 
org.apache.atlas.repository.graph.GraphHelper.removePropagatedTraitNameFromVertex;
 import static 
org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getIdFromVertex;
 import static 
org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
 import static 
org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH;
@@ -191,6 +213,17 @@ public final class EntityGraphRetriever {
         return ret;
     }
 
+    public AtlasClassification toAtlasClassification(AtlasVertex 
classificationVertex) throws AtlasBaseException {
+        AtlasClassification ret = new 
AtlasClassification(getTypeName(classificationVertex));
+
+        ret.setEntityGuid(AtlasGraphUtilsV1.getProperty(classificationVertex, 
CLASSIFICATION_ENTITY_GUID, String.class));
+        ret.setPropagate(AtlasGraphUtilsV1.getProperty(classificationVertex, 
CLASSIFICATION_PROPAGATE_KEY, Boolean.class));
+
+        mapAttributes(classificationVertex, ret, null);
+
+        return ret;
+    }
+
     public AtlasVertex getReferencedEntityVertex(AtlasEdge edge, 
AtlasRelationshipEdgeDirection relationshipDirection, AtlasVertex parentVertex) 
throws AtlasBaseException {
         AtlasVertex entityVertex = null;
 
@@ -245,7 +278,7 @@ public final class EntityGraphRetriever {
     }
 
     private AtlasEntity mapVertexToAtlasEntity(AtlasVertex entityVertex, 
AtlasEntityExtInfo entityExtInfo) throws AtlasBaseException {
-        String      guid   = GraphHelper.getGuid(entityVertex);
+        String      guid   = getGuid(entityVertex);
         AtlasEntity entity = entityExtInfo != null ? 
entityExtInfo.getEntity(guid) : null;
 
         if (entity == null) {
@@ -265,7 +298,7 @@ public final class EntityGraphRetriever {
 
             mapRelationshipAttributes(entityVertex, entity);
 
-            mapClassifications(entityVertex, entity, entityExtInfo);
+            mapClassifications(entityVertex, entity);
         }
 
         return entity;
@@ -284,7 +317,7 @@ public final class EntityGraphRetriever {
         ret.setTypeName(typeName);
         ret.setGuid(guid);
         ret.setStatus(GraphHelper.getStatus(entityVertex));
-        ret.setClassificationNames(GraphHelper.getTraitNames(entityVertex));
+        ret.setClassificationNames(getAllTraitNames(entityVertex));
 
         AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(typeName);
 
@@ -347,8 +380,8 @@ public final class EntityGraphRetriever {
             LOG.debug("Mapping system attributes for type {}", 
entity.getTypeName());
         }
 
-        entity.setGuid(GraphHelper.getGuid(entityVertex));
-        entity.setTypeName(GraphHelper.getTypeName(entityVertex));
+        entity.setGuid(getGuid(entityVertex));
+        entity.setTypeName(getTypeName(entityVertex));
         entity.setStatus(GraphHelper.getStatus(entityVertex));
         entity.setVersion(GraphHelper.getVersion(entityVertex));
 
@@ -377,28 +410,52 @@ public final class EntityGraphRetriever {
         }
     }
 
-    public List<AtlasClassification> getClassifications(String guid) throws 
AtlasBaseException {
+    public boolean isPropagationEnabled(AtlasVertex classificationVertex) {
+        boolean ret = false;
 
+        if (classificationVertex != null) {
+            Boolean enabled = 
AtlasGraphUtilsV1.getProperty(classificationVertex, 
CLASSIFICATION_PROPAGATE_KEY, Boolean.class);
+
+            ret = enabled == null ? true : enabled;
+        }
+
+        return ret;
+    }
+
+    public List<AtlasClassification> getClassifications(String guid) throws 
AtlasBaseException {
         AtlasVertex instanceVertex = AtlasGraphUtilsV1.findByGuid(guid);
+
         if (instanceVertex == null) {
             throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
         }
 
-        return getClassifications(instanceVertex, null);
+        return getClassifications(instanceVertex);
     }
 
     public List<AtlasClassification> getClassifications(AtlasVertex 
instanceVertex) throws AtlasBaseException {
-        return getClassifications(instanceVertex, null);
+        final List<AtlasClassification> classifications           = 
getClassifications(instanceVertex, null);
+        final List<AtlasClassification> propagatedClassifications = 
getPropagatedClassifications(instanceVertex, null);
+
+        classifications.addAll(propagatedClassifications);
+
+        return classifications;
     }
 
     public AtlasClassification getClassification(String guid, String 
classificationName) throws AtlasBaseException {
-
         AtlasVertex instanceVertex = AtlasGraphUtilsV1.findByGuid(guid);
+
         if (instanceVertex == null) {
             throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
         }
 
-        List<AtlasClassification> classifications = 
getClassifications(instanceVertex, classificationName);
+        List<AtlasClassification> classifications = null;
+
+        try {
+            classifications = getClassifications(instanceVertex, 
classificationName);
+        } catch (AtlasBaseException excp) {
+            // ignore and look for propagated classifications
+            classifications = getPropagatedClassifications(instanceVertex, 
classificationName);
+        }
 
         if(CollectionUtils.isEmpty(classifications)) {
             throw new 
AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName);
@@ -409,54 +466,141 @@ public final class EntityGraphRetriever {
 
 
     private List<AtlasClassification> getClassifications(AtlasVertex 
instanceVertex, String classificationNameFilter) throws AtlasBaseException {
-        List<AtlasClassification> classifications = new ArrayList<>();
-        List<String> classificationNames = 
GraphHelper.getTraitNames(instanceVertex);
+        List<AtlasClassification> ret                 = new ArrayList<>();
+        List<String>              classificationNames = 
getTraitNames(instanceVertex);
 
         if (CollectionUtils.isNotEmpty(classificationNames)) {
-            for (String classificationName : classificationNames) {
-                AtlasClassification classification;
-                if (StringUtils.isNotEmpty(classificationNameFilter)) {
-                    if (classificationName.equals(classificationNameFilter)) {
-                        classification = getClassification(instanceVertex, 
classificationName);
-                        classifications.add(classification);
-                        return classifications;
-                    }
-                } else {
-                    classification = getClassification(instanceVertex, 
classificationName);
-                    classifications.add(classification);
+            if (StringUtils.isNotEmpty(classificationNameFilter)) {
+                if (classificationNames.contains(classificationNameFilter)) {
+                    ret.add(getClassification(instanceVertex, 
classificationNameFilter));
+                }
+            } else {
+                for (String classificationName : classificationNames) {
+                    ret.add(getClassification(instanceVertex, 
classificationName));
                 }
             }
+        }
+
+        if (ret.isEmpty() && StringUtils.isNotEmpty(classificationNameFilter)) 
{
+            throw new 
AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, 
classificationNameFilter);
+        }
+
+        return ret;
+    }
 
+    private List<AtlasClassification> getPropagatedClassifications(AtlasVertex 
instanceVertex, String classificationNameFilter) throws AtlasBaseException {
+        List<AtlasClassification> ret                 = new ArrayList<>();
+        List<String>              classificationNames = 
getPropagatedTraitNames(instanceVertex);
 
+        if (CollectionUtils.isNotEmpty(classificationNames)) {
             if (StringUtils.isNotEmpty(classificationNameFilter)) {
-                //Should not reach here if classification present
-                throw new 
AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, 
classificationNameFilter);
+                if (classificationNames.contains(classificationNameFilter)) {
+                    ret.addAll(getAllPropagatedClassifications(instanceVertex, 
classificationNameFilter));
+                }
+            } else {
+                for (String classificationName : new 
HashSet<>(classificationNames)) {
+                    ret.addAll(getAllPropagatedClassifications(instanceVertex, 
classificationName));
+                }
             }
         }
-        return classifications;
+
+        if (ret.isEmpty() && StringUtils.isNotEmpty(classificationNameFilter)) 
{
+            throw new 
AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, 
classificationNameFilter);
+        }
+
+        return ret;
     }
 
-    private AtlasClassification getClassification(AtlasVertex instanceVertex, 
String classificationName) throws AtlasBaseException {
+    private List<AtlasClassification> 
getAllPropagatedClassifications(AtlasVertex vertex, String classificationName) 
throws AtlasBaseException {
+        List<AtlasClassification> ret       = new ArrayList<>();
+        String                    edgeLabel = 
getPropagatedEdgeLabel(classificationName);
+        Iterable<AtlasEdge>       edges     = 
vertex.getEdges(AtlasEdgeDirection.OUT, edgeLabel);
 
-        AtlasClassification ret = null;
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("mapping classification {} to atlas entity", 
classificationName);
+        if (edges != null) {
+            for (Iterator<AtlasEdge> iterator = edges.iterator(); 
iterator.hasNext(); ) {
+                AtlasEdge edge = iterator.next();
+
+                if (edge != null) {
+                    AtlasClassification classification = 
toAtlasClassification(edge.getInVertex());
+
+                    ret.add(classification);
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    protected List<AtlasVertex> 
getPropagationEnabledClassificationVertices(AtlasVertex instanceVertex) {
+        List<AtlasVertex> ret                              = new ArrayList<>();
+        List<AtlasVertex> classificationVertices           = 
getPropagationEnabledClassificationVertices(instanceVertex, false);
+        List<AtlasVertex> propagatedClassificationVertices = 
getPropagationEnabledClassificationVertices(instanceVertex, true);
+
+        if (CollectionUtils.isNotEmpty(classificationVertices)) {
+            ret.addAll(classificationVertices);
+        }
+
+        if (CollectionUtils.isNotEmpty(propagatedClassificationVertices)) {
+            ret.addAll(propagatedClassificationVertices);
+        }
+
+        return ret;
+    }
+
+    private List<AtlasVertex> 
getPropagationEnabledClassificationVertices(AtlasVertex vertex, boolean 
propagated) {
+        List<AtlasVertex> ret                 = new ArrayList<>();
+        List<String>      classificationNames = (propagated) ? 
getPropagatedTraitNames(vertex) : getTraitNames(vertex);
+
+        if (CollectionUtils.isNotEmpty(classificationNames)) {
+            for (String classificationName : classificationNames) {
+                String              traitLabel   = (propagated) ? 
getPropagatedEdgeLabel(classificationName) : getTraitLabel(classificationName);
+                Iterable<AtlasEdge> edges        = 
vertex.getEdges(AtlasEdgeDirection.OUT, traitLabel);
+
+                if (edges != null) {
+                    for (Iterator<AtlasEdge> iterator = edges.iterator(); 
iterator.hasNext(); ) {
+                        AtlasEdge edge = iterator.next();
+
+                        if (edge != null) {
+                            AtlasVertex classificationVertex = 
edge.getInVertex();
+
+                            if (isPropagationEnabled(classificationVertex)) {
+                                ret.add(classificationVertex);
+                            }
+                        }
+                    }
+                }
+            }
         }
 
-        Iterable<AtlasEdge> edges = 
instanceVertex.getEdges(AtlasEdgeDirection.OUT, classificationName);
-        AtlasEdge           edge  = (edges != null && 
edges.iterator().hasNext()) ? edges.iterator().next() : null;
+        return ret;
+    }
 
-        if (edge != null) {
-            ret = new AtlasClassification(classificationName);
-            mapAttributes(edge.getInVertex(), ret, null);
+    public AtlasClassification getClassification(AtlasVertex instanceVertex, 
String classificationName) throws AtlasBaseException {
+        AtlasClassification ret = getClassification(instanceVertex, 
classificationName, false);
+
+        // if no classification with the given name was directly associated, 
look for a propagated classification
+        if (ret == null) {
+            ret = getClassification(instanceVertex, classificationName, true);
         }
 
         return ret;
     }
 
-    private void mapClassifications(AtlasVertex entityVertex, AtlasEntity 
entity, AtlasEntityExtInfo entityExtInfo) throws AtlasBaseException {
-        final List<AtlasClassification> classifications = 
getClassifications(entityVertex, null);
+    private AtlasClassification getClassification(AtlasVertex instanceVertex, 
String classificationName, boolean propagated) throws AtlasBaseException {
+        String              traitLabel = (propagated) ? 
getPropagatedEdgeLabel(classificationName) : getTraitLabel(classificationName);
+        Iterable<AtlasEdge> edges      = 
instanceVertex.getEdges(AtlasEdgeDirection.OUT, traitLabel);
+        AtlasEdge           edge       = (edges != null && 
edges.iterator().hasNext()) ? edges.iterator().next() : null;
+        AtlasClassification ret        = edge != null ? 
toAtlasClassification(edge.getInVertex()) : null;
+
+        return ret;
+    }
+
+    private void mapClassifications(AtlasVertex entityVertex, AtlasEntity 
entity) throws AtlasBaseException {
+        final List<AtlasClassification> classifications           = 
getClassifications(entityVertex, null);
+        final List<AtlasClassification> propagatedClassifications = 
getPropagatedClassifications(entityVertex, null);
+
         entity.setClassifications(classifications);
+        entity.addClassifications(propagatedClassifications);
     }
 
     private Object mapVertexToAttribute(AtlasVertex entityVertex, 
AtlasAttribute attribute, AtlasEntityExtInfo entityExtInfo) throws 
AtlasBaseException {
@@ -664,7 +808,7 @@ public final class EntityGraphRetriever {
                         ret = AtlasTypeUtil.getAtlasObjectId(entity);
                     }
                 } else {
-                    ret = new 
AtlasObjectId(GraphHelper.getGuid(referenceVertex), 
GraphHelper.getTypeName(referenceVertex));
+                    ret = new AtlasObjectId(getGuid(referenceVertex), 
getTypeName(referenceVertex));
                 }
             }
         }
@@ -681,7 +825,7 @@ public final class EntityGraphRetriever {
 
         if (GraphHelper.elementExists(edge)) {
             final AtlasVertex referenceVertex = edge.getInVertex();
-            ret = new AtlasStruct(GraphHelper.getTypeName(referenceVertex));
+            ret = new AtlasStruct(getTypeName(referenceVertex));
 
             mapAttributes(referenceVertex, ret, entityExtInfo);
         }
@@ -756,11 +900,11 @@ public final class EntityGraphRetriever {
         Iterator<AtlasEdge>        edges = null;
 
         if (attribute.getRelationshipEdgeDirection() == IN) {
-            edges = graphHelper.getIncomingEdgesByLabel(entityVertex, 
attribute.getRelationshipEdgeLabel());
+            edges = getIncomingEdgesByLabel(entityVertex, 
attribute.getRelationshipEdgeLabel());
         } else if (attribute.getRelationshipEdgeDirection() == OUT) {
-            edges = graphHelper.getOutGoingEdgesByLabel(entityVertex, 
attribute.getRelationshipEdgeLabel());
+            edges = getOutGoingEdgesByLabel(entityVertex, 
attribute.getRelationshipEdgeLabel());
         } else if (attribute.getRelationshipEdgeDirection() == BOTH) {
-            edges = graphHelper.getAdjacentEdgesByLabel(entityVertex, 
AtlasEdgeDirection.BOTH, attribute.getRelationshipEdgeLabel());
+            edges = getAdjacentEdgesByLabel(entityVertex, 
AtlasEdgeDirection.BOTH, attribute.getRelationshipEdgeLabel());
         }
 
         if (edges != null) {
@@ -787,8 +931,8 @@ public final class EntityGraphRetriever {
             }
 
             if (referenceVertex != null) {
-                String            entityTypeName = 
GraphHelper.getTypeName(referenceVertex);
-                String            entityGuid     = 
GraphHelper.getGuid(referenceVertex);
+                String            entityTypeName = 
getTypeName(referenceVertex);
+                String            entityGuid     = getGuid(referenceVertex);
                 AtlasRelationship relationship   = 
mapEdgeToAtlasRelationship(edge);
 
                 ret = new AtlasRelatedObjectId(entityGuid, entityTypeName, 
relationship.getGuid(),
@@ -835,8 +979,8 @@ public final class EntityGraphRetriever {
             LOG.debug("Mapping system attributes for relationship");
         }
 
-        relationship.setGuid(GraphHelper.getGuid(edge));
-        relationship.setTypeName(GraphHelper.getTypeName(edge));
+        relationship.setGuid(getRelationshipGuid(edge));
+        relationship.setTypeName(getTypeName(edge));
 
         relationship.setCreatedBy(GraphHelper.getCreatedByAsString(edge));
         relationship.setUpdatedBy(GraphHelper.getModifiedByAsString(edge));
@@ -856,11 +1000,11 @@ public final class EntityGraphRetriever {
         AtlasVertex end1Vertex = edge.getOutVertex();
         AtlasVertex end2Vertex = edge.getInVertex();
 
-        relationship.setEnd1(new 
AtlasObjectId(GraphHelper.getGuid(end1Vertex), 
GraphHelper.getTypeName(end1Vertex)));
-        relationship.setEnd2(new 
AtlasObjectId(GraphHelper.getGuid(end2Vertex), 
GraphHelper.getTypeName(end2Vertex)));
+        relationship.setEnd1(new AtlasObjectId(getGuid(end1Vertex), 
getTypeName(end1Vertex)));
+        relationship.setEnd2(new AtlasObjectId(getGuid(end2Vertex), 
getTypeName(end2Vertex)));
 
         relationship.setLabel(edge.getLabel());
-        relationship.setPropagateTags(GraphHelper.getPropagateTags(edge));
+        relationship.setPropagateTags(getPropagateTags(edge));
 
         return relationship;
     }
@@ -881,4 +1025,152 @@ public final class EntityGraphRetriever {
             relationship.setAttribute(attribute.getName(), attrValue);
         }
     }
-}
+
+    public void addTagPropagation(AtlasEdge edge, PropagateTags propagateTags) 
throws AtlasBaseException {
+        if (edge == null) {
+            return;
+        }
+
+        AtlasVertex outVertex = edge.getOutVertex();
+        AtlasVertex inVertex  = edge.getInVertex();
+
+        if (propagateTags == PropagateTags.ONE_TO_TWO || propagateTags == 
PropagateTags.BOTH) {
+            addTagPropagation(outVertex, inVertex, edge);
+        }
+
+        if (propagateTags == PropagateTags.TWO_TO_ONE || propagateTags == 
PropagateTags.BOTH) {
+            addTagPropagation(inVertex, outVertex, edge);
+        }
+    }
+
+    public void removeTagPropagation(AtlasEdge edge, PropagateTags 
propagateTags) throws AtlasBaseException {
+        if (edge == null) {
+            return;
+        }
+
+        AtlasVertex outVertex = edge.getOutVertex();
+        AtlasVertex inVertex  = edge.getInVertex();
+
+        if (propagateTags == PropagateTags.ONE_TO_TWO || propagateTags == 
PropagateTags.BOTH) {
+            removeTagPropagation(outVertex, inVertex, edge);
+        }
+
+        if (propagateTags == PropagateTags.TWO_TO_ONE || propagateTags == 
PropagateTags.BOTH) {
+            removeTagPropagation(inVertex, outVertex, edge);
+        }
+    }
+
+    private void addTagPropagation(AtlasVertex fromVertex, AtlasVertex 
toVertex, AtlasEdge edge) throws AtlasBaseException {
+        final List<AtlasVertex> classificationVertices = 
getPropagationEnabledClassificationVertices(fromVertex);
+        final List<AtlasVertex> impactedEntityVertices = 
CollectionUtils.isNotEmpty(classificationVertices) ? 
graphHelper.getIncludedImpactedVerticesWithReferences(toVertex, 
getRelationshipGuid(edge)) : null;
+
+        if (CollectionUtils.isNotEmpty(impactedEntityVertices)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Propagate {} tags: from {} entity to {} entities", 
classificationVertices.size(), getTypeName(fromVertex), 
impactedEntityVertices.size());
+            }
+
+            for (AtlasVertex classificationVertex : classificationVertices) {
+                String                  classificationName     = 
getTypeName(classificationVertex);
+                String                  propagatedEdgeLabel    = 
getPropagatedEdgeLabel(classificationName);
+                AtlasVertex             associatedEntityVertex = 
getAssociatedEntityVertex(classificationVertex);
+                AtlasClassificationType classificationType     = 
typeRegistry.getClassificationTypeByName(classificationName);
+
+                for (AtlasVertex impactedEntityVertex : 
impactedEntityVertices) {
+                    if (edgeExists(impactedEntityVertex, classificationVertex, 
classificationName)) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(" --> Classification edge already exists 
from [{}] --> [{}][{}] using edge label: [{}]",
+                                    getTypeName(impactedEntityVertex), 
getTypeName(classificationVertex), getTypeName(associatedEntityVertex), 
classificationName);
+                        }
+
+                        continue;
+                    } else if (edgeExists(impactedEntityVertex, 
classificationVertex, propagatedEdgeLabel)) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(" --> Propagated classification edge 
already exists from [{}] --> [{}][{}] using edge label: [{}]",
+                                    getTypeName(impactedEntityVertex), 
getTypeName(classificationVertex), getTypeName(associatedEntityVertex), 
propagatedEdgeLabel);
+                        }
+
+                        continue;
+                    }
+
+                    String          entityTypeName = 
getTypeName(impactedEntityVertex);
+                    AtlasEntityType entityType     = 
typeRegistry.getEntityTypeByName(entityTypeName);
+
+                    if (!classificationType.canApplyToEntityType(entityType)) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(" --> Not creating propagated 
classification edge from [{}] --> [{}][{}], classification is not applicable 
for entity type",
+                                        getTypeName(impactedEntityVertex), 
getTypeName(classificationVertex), getTypeName(associatedEntityVertex));
+                        }
+
+                        continue;
+                    }
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(" --> Creating propagated classification 
edge from [{}] --> [{}][{}] using edge label: [{}]",
+                                  getTypeName(impactedEntityVertex), 
getTypeName(classificationVertex), getTypeName(associatedEntityVertex), 
propagatedEdgeLabel);
+                    }
+
+                    graphHelper.addEdge(impactedEntityVertex, 
classificationVertex, propagatedEdgeLabel);
+
+                    GraphHelper.addListProperty(impactedEntityVertex, 
PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, classificationName);
+                }
+            }
+        }
+    }
+
+    private void removeTagPropagation(AtlasVertex fromVertex, AtlasVertex 
toVertex, AtlasEdge edge) throws AtlasBaseException {
+        final List<AtlasVertex> classificationVertices = 
getPropagationEnabledClassificationVertices(fromVertex);
+        final List<AtlasVertex> impactedEntityVertices = 
CollectionUtils.isNotEmpty(classificationVertices) ? 
graphHelper.getIncludedImpactedVerticesWithReferences(toVertex, 
getRelationshipGuid(edge)) : null;
+
+        if (CollectionUtils.isNotEmpty(impactedEntityVertices)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Removing {} propagated tags: for {} from {} 
entities", classificationVertices.size(), getTypeName(fromVertex), 
impactedEntityVertices.size());
+            }
+
+            for (AtlasVertex classificationVertex : classificationVertices) {
+                String            classificationName     = 
getTypeName(classificationVertex);
+                String            propagatedEdgeLabel    = 
getPropagatedEdgeLabel(classificationName);
+                AtlasVertex       associatedEntityVertex = 
getAssociatedEntityVertex(classificationVertex);
+                List<AtlasVertex> referrals = 
graphHelper.getIncludedImpactedVerticesWithReferences(associatedEntityVertex, 
getRelationshipGuid(edge));
+
+                for (AtlasVertex impactedEntityVertex : 
impactedEntityVertices) {
+                    if (referrals.contains(impactedEntityVertex)) {
+                        if (LOG.isDebugEnabled()) {
+                            if 
(StringUtils.equals(getGuid(impactedEntityVertex), 
getGuid(associatedEntityVertex))) {
+                                LOG.debug(" --> Not removing propagated 
classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is 
associated with [{}]",
+                                          getTypeName(impactedEntityVertex), 
getTypeName(classificationVertex), getTypeName(associatedEntityVertex), 
propagatedEdgeLabel, classificationName, getTypeName(associatedEntityVertex));
+                            } else {
+                                LOG.debug(" --> Not removing propagated 
classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is 
propagated through other path",
+                                          getTypeName(impactedEntityVertex), 
getTypeName(classificationVertex), getTypeName(associatedEntityVertex), 
propagatedEdgeLabel, classificationName);
+                            }
+                        }
+
+                        continue;
+                    }
+
+                    // remove propagated classification edge and 
classificationName from propagatedTraitNames vertex property
+                    if (edgeExists(impactedEntityVertex, classificationVertex, 
propagatedEdgeLabel)) {
+                        try {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug(" --> Removing propagated 
classification edge from [{}] --> [{}][{}] with edge label: [{}]",
+                                          getTypeName(impactedEntityVertex), 
getTypeName(classificationVertex), getTypeName(associatedEntityVertex), 
propagatedEdgeLabel);
+                            }
+
+                            AtlasEdge propagatedEdge = 
graphHelper.getOrCreateEdge(impactedEntityVertex, classificationVertex, 
propagatedEdgeLabel);
+
+                            graphHelper.removeEdge(propagatedEdge);
+
+                            
removePropagatedTraitNameFromVertex(impactedEntityVertex, classificationName);
+                        } catch (RepositoryException e) {
+                            throw new 
AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
+                        }
+                    } else {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(" --> Not removing propagated 
classification edge from [{}] --> [{}][{}] using edge label: [{}], since edge 
doesn't exist",
+                                      getTypeName(impactedEntityVertex), 
getTypeName(classificationVertex), getTypeName(associatedEntityVertex), 
propagatedEdgeLabel);
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
 
b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
index 1fda241..58e3492 100644
--- 
a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
+++ 
b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
@@ -63,6 +63,18 @@ public class AtlasGremlin3QueryProvider extends 
AtlasGremlin2QueryProvider {
                 return "g.V().range(0,1).toList()";
             case GREMLIN_SEARCH_RETURNS_EDGE_ID:
                 return "g.E().range(0,1).toList()";
+
+            case TAG_PROPAGATION_IMPACTED_INSTANCES:
+                return "g.V().has('__guid', guid).aggregate('src')" +
+                            ".repeat(union(outE().has('__state', 
'ACTIVE').has('tagPropagation', within('ONE_TO_TWO', 'BOTH')).inV(), " +
+                                           "inE().has('__state', 
'ACTIVE').has('tagPropagation', within('TWO_TO_ONE', 'BOTH')).outV())" +
+                            
".dedup().where(without('src')).simplePath()).emit().toList();";
+
+            case TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL:
+                return "g.V().has('__guid', guid).aggregate('src')" +
+                            ".repeat(union(outE().has('__state', 
'ACTIVE').has('tagPropagation', within('ONE_TO_TWO', 'BOTH')).has('_r__guid', 
neq(relationshipGuid)).inV(), " +
+                                           "inE().has('__state', 
'ACTIVE').has('tagPropagation', within('TWO_TO_ONE', 'BOTH')).has('_r__guid', 
neq(relationshipGuid)).outV())" +
+                            
".dedup().where(without('src')).simplePath()).emit().toList();";
         }
         return super.getQuery(gremlinQuery);
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java 
b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
index a79abaa..3e3ee11 100644
--- 
a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
+++ 
b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
@@ -81,6 +81,9 @@ public abstract class AtlasGremlinQueryProvider {
         COMPARE_ENDS_WITH,
         COMPARE_CONTAINS,
         COMPARE_IS_NULL,
-        COMPARE_NOT_NULL
+        COMPARE_NOT_NULL,
+
+        TAG_PROPAGATION_IMPACTED_INSTANCES,
+        TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java
 
b/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java
index 0e1e5b6..85f0d06 100644
--- 
a/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java
+++ 
b/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java
@@ -25,6 +25,7 @@ import org.apache.atlas.repository.graphdb.GraphDatabase;
 import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1;
 import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1;
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,6 +53,7 @@ public class AtlasRepositoryConfiguration {
     private static Integer typeUpdateLockMaxWaitTimeInSeconds = null;
 
     private static final String ENABLE_FULLTEXT_SEARCH_PROPERTY = 
"atlas.search.fulltext.enable";
+    private static final String ENTITY_NOTIFICATION_VERSION_PROPERTY = 
"atlas.notification.entity.version";
 
     /**
      * Configures whether the full text vertex property is populated.  Turning 
this off
@@ -62,6 +64,19 @@ public class AtlasRepositoryConfiguration {
         return 
ApplicationProperties.get().getBoolean(ENABLE_FULLTEXT_SEARCH_PROPERTY, true);
     }
 
+    public static boolean isV2EntityNotificationEnabled() {
+        boolean ret;
+        try {
+            String notificationVersion = 
ApplicationProperties.get().getString(ENTITY_NOTIFICATION_VERSION_PROPERTY, 
"v2");
+
+            return StringUtils.equalsIgnoreCase(notificationVersion, "v2");
+        } catch (AtlasException e) {
+            ret = true;
+        }
+
+        return ret;
+    }
+
     private static final String AUDIT_REPOSITORY_IMPLEMENTATION_PROPERTY = 
"atlas.EntityAuditRepository.impl";
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/test/java/org/apache/atlas/TestModules.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/TestModules.java 
b/repository/src/test/java/org/apache/atlas/TestModules.java
index 13bdcb0..c901e89 100644
--- a/repository/src/test/java/org/apache/atlas/TestModules.java
+++ b/repository/src/test/java/org/apache/atlas/TestModules.java
@@ -30,8 +30,10 @@ import org.apache.atlas.discovery.EntityDiscoveryService;
 import org.apache.atlas.discovery.EntityLineageService;
 import org.apache.atlas.graph.GraphSandboxUtil;
 import org.apache.atlas.listener.EntityChangeListener;
+import org.apache.atlas.listener.EntityChangeListenerV2;
 import org.apache.atlas.listener.TypeDefChangeListener;
 import org.apache.atlas.repository.audit.EntityAuditListener;
+import org.apache.atlas.repository.audit.EntityAuditListenerV2;
 import org.apache.atlas.repository.audit.EntityAuditRepository;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
@@ -145,6 +147,10 @@ public class TestModules {
                     Multibinder.newSetBinder(binder(), 
EntityChangeListener.class);
             
entityChangeListenerBinder.addBinding().to(EntityAuditListener.class);
 
+            Multibinder<EntityChangeListenerV2> entityChangeListenerV2Binder =
+                    Multibinder.newSetBinder(binder(), 
EntityChangeListenerV2.class);
+            
entityChangeListenerV2Binder.addBinding().to(EntityAuditListenerV2.class);
+
             final GraphTransactionInterceptor graphTransactionInterceptor = 
new GraphTransactionInterceptor(new AtlasGraphProvider().get());
             requestInjection(graphTransactionInterceptor);
             bindInterceptor(Matchers.any(), 
Matchers.annotatedWith(GraphTransaction.class), graphTransactionInterceptor);

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
----------------------------------------------------------------------
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
 
b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
index 47a61ee..4d70b7f 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
@@ -46,9 +46,9 @@ public class AuditRepositoryTestBase {
         EntityAuditEvent event = new EntityAuditEvent(rand(), 
System.currentTimeMillis(), "u1",
                 EntityAuditEvent.EntityAuditAction.ENTITY_CREATE, "d1", new 
Referenceable(rand()));
 
-        eventRepository.putEvents(event);
+        eventRepository.putEventsV1(event);
 
-        List<EntityAuditEvent> events = 
eventRepository.listEvents(event.getEntityId(), null, (short) 10);
+        List<EntityAuditEvent> events = 
eventRepository.listEventsV1(event.getEntityId(), null, (short) 10);
 
         assertEquals(events.size(), 1);
         assertEventEquals(events.get(0), event);
@@ -67,28 +67,28 @@ public class AuditRepositoryTestBase {
             //Add events for both ids
             EntityAuditEvent event = new EntityAuditEvent(id2, ts - i, "user" 
+ i, EntityAuditEvent.EntityAuditAction.ENTITY_UPDATE, "details" + i, entity);
 
-            eventRepository.putEvents(event);
+            eventRepository.putEventsV1(event);
             expectedEvents.add(event);
-            eventRepository.putEvents(new EntityAuditEvent(id1, ts - i, "user" 
+ i, EntityAuditEvent.EntityAuditAction.TAG_DELETE, "details" + i, entity));
-            eventRepository.putEvents(new EntityAuditEvent(id3, ts - i, "user" 
+ i, EntityAuditEvent.EntityAuditAction.TAG_ADD, "details" + i, entity));
+            eventRepository.putEventsV1(new EntityAuditEvent(id1, ts - i, 
"user" + i, EntityAuditEvent.EntityAuditAction.TAG_DELETE, "details" + i, 
entity));
+            eventRepository.putEventsV1(new EntityAuditEvent(id3, ts - i, 
"user" + i, EntityAuditEvent.EntityAuditAction.TAG_ADD, "details" + i, entity));
         }
 
         //Use ts for which there is no event - ts + 2
-        List<EntityAuditEvent> events = eventRepository.listEvents(id2, null, 
(short) 3);
+        List<EntityAuditEvent> events = eventRepository.listEventsV1(id2, 
null, (short) 3);
         assertEquals(events.size(), 3);
         assertEventEquals(events.get(0), expectedEvents.get(0));
         assertEventEquals(events.get(1), expectedEvents.get(1));
         assertEventEquals(events.get(2), expectedEvents.get(2));
 
         //Use last event's timestamp for next list(). Should give only 1 event 
and shouldn't include events from other id
-        events = eventRepository.listEvents(id2, events.get(2).getEventKey(), 
(short) 3);
+        events = eventRepository.listEventsV1(id2, 
events.get(2).getEventKey(), (short) 3);
         assertEquals(events.size(), 1);
         assertEventEquals(events.get(0), expectedEvents.get(2));
     }
 
     @Test
     public void testInvalidEntityId() throws Exception {
-        List<EntityAuditEvent> events = eventRepository.listEvents(rand(), 
null, (short) 3);
+        List<EntityAuditEvent> events = eventRepository.listEventsV1(rand(), 
null, (short) 3);
 
         assertEquals(events.size(), 0);
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
 
b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
new file mode 100644
index 0000000..01a95cf
--- /dev/null
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.listener.EntityChangeListenerV2;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.v1.model.notification.EntityNotificationV2;
+import 
org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.atlas.notification.NotificationEntityChangeListener.ATLAS_ENTITY_NOTIFICATION_PROPERTY;
+import static 
org.apache.atlas.notification.NotificationInterface.NotificationType.ENTITIES;
+import static 
org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType.CLASSIFICATION_ADD;
+import static 
org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType.CLASSIFICATION_DELETE;
+import static 
org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType.CLASSIFICATION_UPDATE;
+import static org.apache.atlas.repository.graph.GraphHelper.isInternalType;
+import static 
org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType.ENTITY_CREATE;
+import static 
org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType.ENTITY_DELETE;
+import static 
org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType.ENTITY_UPDATE;
+
+@Component
+public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
+    private final AtlasTypeRegistry         typeRegistry;
+    private final NotificationInterface     notificationInterface;
+    private final Map<String, List<String>> notificationAttributesCache = new 
HashMap<>();
+
+    private static Configuration APPLICATION_PROPERTIES = null;
+
+    @Inject
+    public EntityNotificationListenerV2(AtlasTypeRegistry typeRegistry, 
NotificationInterface notificationInterface) {
+        this.typeRegistry          = typeRegistry;
+        this.notificationInterface = notificationInterface;
+    }
+
+    @Override
+    public void onEntitiesAdded(List<AtlasEntity> entities, boolean isImport) 
throws AtlasBaseException {
+        notifyEntityEvents(entities, ENTITY_CREATE);
+    }
+
+    @Override
+    public void onEntitiesUpdated(List<AtlasEntity> entities, boolean 
isImport) throws AtlasBaseException {
+        notifyEntityEvents(entities, ENTITY_UPDATE);
+    }
+
+    @Override
+    public void onEntitiesDeleted(List<AtlasEntity> entities, boolean 
isImport) throws AtlasBaseException {
+        notifyEntityEvents(entities, ENTITY_DELETE);
+    }
+
+    @Override
+    public void onClassificationsAdded(AtlasEntity entity, 
List<AtlasClassification> classifications) throws AtlasBaseException {
+        notifyEntityEvents(Collections.singletonList(entity), 
CLASSIFICATION_ADD);
+    }
+
+    @Override
+    public void onClassificationsUpdated(AtlasEntity entity, 
List<AtlasClassification> classifications) throws AtlasBaseException {
+        notifyEntityEvents(Collections.singletonList(entity), 
CLASSIFICATION_UPDATE);
+    }
+
+    @Override
+    public void onClassificationsDeleted(AtlasEntity entity, List<String> 
classificationNames) throws AtlasBaseException {
+        notifyEntityEvents(Collections.singletonList(entity), 
CLASSIFICATION_DELETE);
+    }
+
+    private void notifyEntityEvents(List<AtlasEntity> entities, OperationType 
operationType) throws AtlasBaseException {
+        List<EntityNotificationV2> messages = new ArrayList<>();
+
+        for (AtlasEntity entity : entities) {
+            if (isInternalType(entity.getTypeName())) {
+                continue;
+            }
+
+            filterNotificationAttributes(entity);
+
+            messages.add(new EntityNotificationV2(entity, operationType, 
getAllClassifications(entity)));
+        }
+
+        if (!messages.isEmpty()) {
+            try {
+                notificationInterface.send(ENTITIES, messages);
+            } catch (NotificationException e) {
+                throw new 
AtlasBaseException(AtlasErrorCode.ENTITY_NOTIFICATION_FAILED, e, 
operationType.name());
+            }
+        }
+    }
+
+    private List<AtlasClassification> getAllClassifications(AtlasEntity 
entity) {
+        List<AtlasClassification> ret = 
getAllClassifications(entity.getClassifications(), typeRegistry);
+
+        return ret;
+    }
+
+    private static List<AtlasClassification> 
getAllClassifications(List<AtlasClassification> classifications, 
AtlasTypeRegistry typeRegistry) {
+        List<AtlasClassification> ret = new LinkedList<>();
+
+        if (CollectionUtils.isNotEmpty(classifications)) {
+            for (AtlasClassification classification : classifications) {
+                AtlasClassificationType classificationType = 
typeRegistry.getClassificationTypeByName(classification.getTypeName());
+                Set<String>             superTypeNames     = 
classificationType != null ? classificationType.getAllSuperTypes() : null;
+
+                ret.add(classification);
+
+                if (CollectionUtils.isNotEmpty(superTypeNames)) {
+                    for (String superTypeName : superTypeNames) {
+                        AtlasClassification superTypeClassification = new 
AtlasClassification(superTypeName);
+
+                        
superTypeClassification.setEntityGuid(classification.getEntityGuid());
+                        
superTypeClassification.setPropagate(classification.isPropagate());
+
+                        if 
(MapUtils.isNotEmpty(classification.getAttributes())) {
+                            AtlasClassificationType superType = 
typeRegistry.getClassificationTypeByName(superTypeName);
+
+                            if (superType != null && 
MapUtils.isNotEmpty(superType.getAllAttributes())) {
+                                Map<String, Object> 
superTypeClassificationAttributes = new HashMap<>();
+
+                                for (Map.Entry<String, Object> attrEntry : 
classification.getAttributes().entrySet()) {
+                                    String attrName = attrEntry.getKey();
+
+                                    if 
(superType.getAllAttributes().containsKey(attrName)) {
+                                        
superTypeClassificationAttributes.put(attrName, attrEntry.getValue());
+                                    }
+                                }
+
+                                
superTypeClassification.setAttributes(superTypeClassificationAttributes);
+                            }
+                        }
+
+                        ret.add(superTypeClassification);
+                    }
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    private void filterNotificationAttributes(AtlasEntity entity) {
+        Map<String, Object> attributesMap     = entity.getAttributes();
+        List<String>        notificationAttrs = 
getNotificationAttributes(entity.getTypeName());
+
+        if (MapUtils.isNotEmpty(attributesMap) && 
CollectionUtils.isNotEmpty(notificationAttrs)) {
+            Collection<String> attributesToRemove = 
CollectionUtils.subtract(attributesMap.keySet(), notificationAttrs);
+
+            for (String attributeToRemove : attributesToRemove) {
+                attributesMap.remove(attributeToRemove);
+            }
+        }
+    }
+
+    private List<String> getNotificationAttributes(String entityType) {
+        List<String> ret = null;
+
+        initApplicationProperties();
+
+        if (notificationAttributesCache.containsKey(entityType)) {
+            ret = notificationAttributesCache.get(entityType);
+        } else if (APPLICATION_PROPERTIES != null) {
+            String[] notificationAttributes = 
APPLICATION_PROPERTIES.getStringArray(ATLAS_ENTITY_NOTIFICATION_PROPERTY + "." +
+                                                                               
     entityType + "." + "attributes.include");
+
+            if (notificationAttributes != null) {
+                ret = Arrays.asList(notificationAttributes);
+            }
+
+            notificationAttributesCache.put(entityType, ret);
+        }
+
+        return ret;
+    }
+
+    private void initApplicationProperties() {
+        if (APPLICATION_PROPERTIES == null) {
+            try {
+                APPLICATION_PROPERTIES = ApplicationProperties.get();
+            } catch (AtlasException ex) {
+                // ignore
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
index 396a292..a3e5949 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java
@@ -42,7 +42,7 @@ import java.util.*;
  */
 @Component
 public class NotificationEntityChangeListener implements EntityChangeListener {
-    private static final String ATLAS_ENTITY_NOTIFICATION_PROPERTY = 
"atlas.notification.entity";
+    protected static final String ATLAS_ENTITY_NOTIFICATION_PROPERTY = 
"atlas.notification.entity";
 
     private final NotificationInterface     notificationInterface;
     private final AtlasTypeRegistry         typeRegistry;

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java 
b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
index 715a54d..4f4f091 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
@@ -18,7 +18,6 @@
 
 package org.apache.atlas.web.resources;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -28,7 +27,7 @@ import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.CreateUpdateEntitiesResult;
 import org.apache.atlas.EntityAuditEvent;
-import org.apache.atlas.discovery.AtlasDiscoveryService;
+import org.apache.atlas.model.audit.EntityAuditEventV2;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
@@ -98,7 +97,7 @@ public class EntityResource {
     private final AtlasTypeRegistry      typeRegistry;
     private final EntityREST             entityREST;
     private final EntityAuditRepository  entityAuditRepository;
-    private final AtlasDiscoveryService  atlasDiscoveryService;
+    private final AtlasInstanceConverter instanceConverter;
 
     @Context
     UriInfo uriInfo;
@@ -109,13 +108,13 @@ public class EntityResource {
                           final AtlasTypeRegistry typeRegistry,
                           final EntityREST entityREST,
                           final EntityAuditRepository entityAuditRepository,
-                          final AtlasDiscoveryService atlasDiscoveryService) {
+                          final AtlasInstanceConverter instanceConverter) {
         this.restAdapters  = restAdapters;
         this.entitiesStore = entitiesStore;
         this.typeRegistry  = typeRegistry;
         this.entityREST    = entityREST;
         this.entityAuditRepository = entityAuditRepository;
-        this.atlasDiscoveryService = atlasDiscoveryService;
+        this.instanceConverter = instanceConverter;
     }
 
     /**
@@ -1133,20 +1132,27 @@ public class EntityResource {
 
         AtlasPerfTracer perf = null;
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Audit events request for entity {}, start key {}, 
number of results required {}", guid, startKey, count);
-        }
-
         try {
             if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
                 perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, 
"EntityResource.getAuditEvents(" + guid + ", " + startKey + ", " + count + ")");
             }
 
-            List<EntityAuditEvent> events = 
entityAuditRepository.listEvents(guid, startKey, count);
+            List                   events   = 
entityAuditRepository.listEvents(guid, startKey, count);
+            List<EntityAuditEvent> v1Events = new ArrayList<>();
+
+            for (Object event : events) {
+                if (event instanceof EntityAuditEvent) {
+                    v1Events.add((EntityAuditEvent) event);
+                } else if (event instanceof EntityAuditEventV2) {
+                    
v1Events.add(instanceConverter.toV1AuditEvent((EntityAuditEventV2) event));
+                } else {
+                    LOG.warn("unknown entity-audit event type {}. Ignored", 
event != null ? event.getClass().getCanonicalName() : "null");
+                }
+            }
 
             Map<String, Object> response = new HashMap<>();
             response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
-            response.put(AtlasClient.EVENTS, events);
+            response.put(AtlasClient.EVENTS, v1Events);
             return Response.ok(AtlasJson.toV1Json(response)).build();
         } catch (IllegalArgumentException e) {
             LOG.error("Unable to get audit events for entity guid={} 
startKey={}", guid, startKey, e);

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java 
b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
index 21402e1..fdafa2c 100644
--- a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
@@ -18,6 +18,8 @@
 package org.apache.atlas.web.rest;
 
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.EntityAuditEvent;
+import org.apache.atlas.model.audit.EntityAuditEventV2;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasClassification;
@@ -26,6 +28,8 @@ import 
org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.ClassificationAssociateRequest;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
 import org.apache.atlas.repository.store.graph.v1.EntityStream;
@@ -38,6 +42,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
 import javax.inject.Inject;
@@ -45,6 +50,7 @@ import javax.inject.Singleton;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
@@ -67,17 +73,24 @@ import java.util.Map;
 @Singleton
 @Service
 public class EntityREST {
+    private static final Logger LOG      = 
LoggerFactory.getLogger(EntityREST.class);
     private static final Logger PERF_LOG = 
AtlasPerfTracer.getPerfLogger("rest.EntityREST");
 
     public static final String PREFIX_ATTR = "attr:";
 
-    private final AtlasTypeRegistry         typeRegistry;
-    private final AtlasEntityStore          entitiesStore;
+    private final AtlasTypeRegistry      typeRegistry;
+    private final AtlasEntityStore       entitiesStore;
+    private final EntityAuditRepository  auditRepository;
+    private final AtlasInstanceConverter instanceConverter;
+
 
     @Inject
-    public EntityREST(AtlasTypeRegistry typeRegistry, AtlasEntityStore 
entitiesStore) {
-        this.typeRegistry    = typeRegistry;
-        this.entitiesStore   = entitiesStore;
+    public EntityREST(AtlasTypeRegistry typeRegistry, AtlasEntityStore 
entitiesStore,
+                      EntityAuditRepository auditRepository, 
AtlasInstanceConverter instanceConverter) {
+        this.typeRegistry      = typeRegistry;
+        this.entitiesStore     = entitiesStore;
+        this.auditRepository   = auditRepository;
+        this.instanceConverter = instanceConverter;
     }
 
     /**
@@ -409,14 +422,14 @@ public class EntityREST {
     @PUT
     @Path("/guid/{guid}/classifications")
     @Produces(Servlets.JSON_MEDIA_TYPE)
-    public void updateClassification(@PathParam("guid") final String guid, 
List<AtlasClassification> classifications) throws AtlasBaseException {
+    public void updateClassifications(@PathParam("guid") final String guid, 
List<AtlasClassification> classifications) throws AtlasBaseException {
         Servlets.validateQueryParamLength("guid", guid);
 
         AtlasPerfTracer perf = null;
 
         try {
             if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
-                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, 
"EntityREST.updateClassification(" + guid + ")");
+                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, 
"EntityREST.updateClassifications(" + guid + ")");
             }
 
             if (StringUtils.isEmpty(guid)) {
@@ -581,6 +594,38 @@ public class EntityREST {
         }
     }
 
+    @GET
+    @Path("{guid}/audit")
+    @Consumes(Servlets.JSON_MEDIA_TYPE)
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    public List<EntityAuditEventV2> getAuditEvents(@PathParam("guid") String 
guid, @QueryParam("startKey") String startKey,
+                                                   @QueryParam("count") 
@DefaultValue("100") short count) throws AtlasBaseException {
+        AtlasPerfTracer perf = null;
+
+        try {
+            if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, 
"EntityREST.getAuditEvents(" + guid + ", " + startKey + ", " + count + ")");
+            }
+
+            List                     events = auditRepository.listEvents(guid, 
startKey, count);
+            List<EntityAuditEventV2> ret    = new ArrayList<>();
+
+            for (Object event : events) {
+                if (event instanceof EntityAuditEventV2) {
+                    ret.add((EntityAuditEventV2) event);
+                } else if (event instanceof EntityAuditEvent) {
+                    
ret.add(instanceConverter.toV2AuditEvent((EntityAuditEvent) event));
+                } else {
+                    LOG.warn("unknown entity-audit event type {}. Ignored", 
event != null ? event.getClass().getCanonicalName() : "null");
+                }
+            }
+
+            return ret;
+        } finally {
+            AtlasPerfTracer.log(perf);
+        }
+    }
+
     private AtlasEntityType ensureEntityType(String typeName) throws 
AtlasBaseException {
         AtlasEntityType ret = typeRegistry.getEntityTypeByName(typeName);
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntityREST.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntityREST.java 
b/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntityREST.java
index ea6fe31..f0bc962 100644
--- a/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntityREST.java
+++ b/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntityREST.java
@@ -164,7 +164,7 @@ public class TestEntityREST {
             put("tag", "tagName_updated");
         }});
 
-        entityREST.updateClassification(dbEntity.getGuid(), new 
ArrayList<>(Arrays.asList(phiClassification, testClassification)));
+        entityREST.updateClassifications(dbEntity.getGuid(), new 
ArrayList<>(Arrays.asList(phiClassification, testClassification)));
 
         AtlasClassification updatedClassification = 
entityREST.getClassification(dbEntity.getGuid(), TestUtilsV2.PHI);
         Assert.assertNotNull(updatedClassification);

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java
 
b/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java
index f769431..e4887d2 100755
--- 
a/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java
+++ 
b/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java
@@ -808,7 +808,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
         } catch (AtlasServiceException e) {
             assertNotNull(e);
             assertNotNull(e.getStatus());
-            assertEquals(e.getStatus(), ClientResponse.Status.NOT_FOUND);
+            assertEquals(e.getStatus(), ClientResponse.Status.BAD_REQUEST);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
 
b/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
index 0f43d6f..dabb2ef 100755
--- 
a/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
+++ 
b/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java
@@ -560,7 +560,7 @@ public class EntityV2JerseyResourceIT extends 
BaseResourceIT {
             fail("Deletion should've failed for non-existent trait 
association");
         } catch (AtlasServiceException ex) {
             Assert.assertNotNull(ex.getStatus());
-            assertEquals(ex.getStatus(), ClientResponse.Status.NOT_FOUND);
+            assertEquals(ex.getStatus(), ClientResponse.Status.BAD_REQUEST);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/test/resources/atlas-application.properties
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/atlas-application.properties 
b/webapp/src/test/resources/atlas-application.properties
index d900916..62fa603 100644
--- a/webapp/src/test/resources/atlas-application.properties
+++ b/webapp/src/test/resources/atlas-application.properties
@@ -64,6 +64,7 @@ atlas.lineage.schema.query.hive_table_v1=hive_table_v1 where 
__guid='%s'\, colum
 
 #########  Notification Configs  #########
 atlas.notification.embedded=true
+atlas.notification.entity.version=v1
 
 atlas.kafka.zookeeper.connect=localhost:19026
 atlas.kafka.bootstrap.servers=localhost:19027

Reply via email to