http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java 
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
index d61bff2..9d56fa9 100755
--- 
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
@@ -26,10 +26,12 @@ import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity.Status;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasRelationship;
 import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.util.AtlasGremlinQueryProvider;
 import org.apache.atlas.v1.model.instance.Id;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
@@ -55,6 +57,9 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.script.Bindings;
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -68,6 +73,14 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 
+import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
+import static 
org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY;
+import static 
org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH;
+import static 
org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
+import static 
org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
+import static 
org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES;
+import static 
org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL;
 
 /**
  * Utility class for graph operations.
@@ -80,6 +93,8 @@ public final class GraphHelper {
     public static final String RETRY_COUNT = "atlas.graph.storage.num.retries";
     public static final String RETRY_DELAY = 
"atlas.graph.storage.retry.sleeptime.ms";
 
+    private final AtlasGremlinQueryProvider queryProvider = 
AtlasGremlinQueryProvider.INSTANCE;
+
     private static volatile GraphHelper INSTANCE;
 
     private AtlasGraph graph;
@@ -166,7 +181,7 @@ public final class GraphHelper {
         return vertexWithoutIdentity;
     }
 
-    private AtlasEdge addEdge(AtlasVertex fromVertex, AtlasVertex toVertex, 
String edgeLabel) {
+    public AtlasEdge addEdge(AtlasVertex fromVertex, AtlasVertex toVertex, 
String edgeLabel) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Adding edge for {} -> label {} -> {}", 
string(fromVertex), edgeLabel, string(toVertex));
         }
@@ -266,6 +281,26 @@ public final class GraphHelper {
         return (AtlasEdge) findElement(false, args);
     }
 
+    public static boolean edgeExists(AtlasVertex outVertex, AtlasVertex 
inVertex, String edgeLabel) {
+        boolean             ret   = false;
+        Iterator<AtlasEdge> edges = getAdjacentEdgesByLabel(inVertex, 
AtlasEdgeDirection.IN, edgeLabel);
+
+        while (edges != null && edges.hasNext()) {
+            AtlasEdge edge = edges.next();
+
+            if (edge.getOutVertex().equals(outVertex)) {
+                Status edgeState = getStatus(edge);
+
+                if (edgeState == null || edgeState == ACTIVE) {
+                    ret = true;
+                    break;
+                }
+            }
+        }
+
+        return ret;
+    }
+
     private AtlasElement findElement(boolean isVertexSearch, Object... args) 
throws EntityNotFoundException {
         AtlasGraphQuery query = graph.query();
 
@@ -522,7 +557,7 @@ public final class GraphHelper {
     }
 
     /**
-     * Adds an additional value to a multi-property.
+     * Adds an additional value to a multi-property (SET).
      *
      * @param vertex
      * @param propertyName
@@ -539,6 +574,23 @@ public final class GraphHelper {
     }
 
     /**
+     * Adds an additional value to a multi-property (LIST).
+     *
+     * @param vertex
+     * @param propertyName
+     * @param value
+     */
+    public static void addListProperty(AtlasVertex vertex, String 
propertyName, Object value) {
+        String actualPropertyName = 
GraphHelper.encodePropertyKey(propertyName);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Adding property {} = \"{}\" to vertex {}", 
actualPropertyName, value, string(vertex));
+        }
+
+        vertex.addListProperty(actualPropertyName, value);
+    }
+
+    /**
      * Remove the specified edge from the graph.
      *
      * @param edge
@@ -635,6 +687,73 @@ public final class GraphHelper {
         return result;
     }
 
+    public List<AtlasVertex> 
getIncludedImpactedVerticesWithReferences(AtlasVertex entityVertex, String 
relationshipGuid) throws AtlasBaseException {
+        List<AtlasVertex> ret              = new ArrayList<>();
+        List<AtlasVertex> impactedVertices = 
getImpactedVerticesWithReferences(getGuid(entityVertex), relationshipGuid);
+
+        ret.add(entityVertex);
+
+        if (CollectionUtils.isNotEmpty(impactedVertices)) {
+            ret.addAll(impactedVertices);
+        }
+
+        return ret;
+    }
+
+    public List<AtlasVertex> getImpactedVertices(String guid) throws 
AtlasBaseException {
+        ScriptEngine      scriptEngine = graph.getGremlinScriptEngine();
+        Bindings          bindings     = scriptEngine.createBindings();
+        String            query        = 
queryProvider.getQuery(TAG_PROPAGATION_IMPACTED_INSTANCES);
+        List<AtlasVertex> ret          = new ArrayList<>();
+
+        bindings.put("g", graph);
+        bindings.put("guid", guid);
+
+        try {
+            Object resultObj = graph.executeGremlinScript(scriptEngine, 
bindings, query, false);
+
+            if (resultObj instanceof List && CollectionUtils.isNotEmpty((List) 
resultObj)) {
+                List<?> results = (List) resultObj;
+                Object firstElement = results.get(0);
+
+                if (firstElement instanceof AtlasVertex) {
+                    ret = (List<AtlasVertex>) results;
+                }
+            }
+        } catch (ScriptException e) {
+            throw new 
AtlasBaseException(AtlasErrorCode.GREMLIN_SCRIPT_EXECUTION_FAILED, e);
+        }
+
+        return ret;
+    }
+
+    public List<AtlasVertex> getImpactedVerticesWithReferences(String guid, 
String relationshipGuid) throws AtlasBaseException {
+        ScriptEngine      scriptEngine = graph.getGremlinScriptEngine();
+        Bindings          bindings     = scriptEngine.createBindings();
+        String            query        = 
queryProvider.getQuery(TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL);
+        List<AtlasVertex> ret          = new ArrayList<>();
+
+        bindings.put("g", graph);
+        bindings.put("guid", guid);
+        bindings.put("relationshipGuid", relationshipGuid);
+
+        try {
+            Object resultObj = graph.executeGremlinScript(scriptEngine, 
bindings, query, false);
+
+            if (resultObj instanceof List && CollectionUtils.isNotEmpty((List) 
resultObj)) {
+                List<?> results = (List) resultObj;
+                Object firstElement = results.get(0);
+
+                if (firstElement instanceof AtlasVertex) {
+                    ret = (List<AtlasVertex>) results;
+                }
+            }
+        } catch (ScriptException e) {
+            throw new 
AtlasBaseException(AtlasErrorCode.GREMLIN_SCRIPT_EXECUTION_FAILED, e);
+        }
+
+        return ret;
+    }
 
     /**
      * Finds the Vertices that correspond to the given GUIDs.  GUIDs
@@ -655,13 +774,60 @@ public final class GraphHelper {
         return attrName;
     }
 
+    public static String getTraitLabel(String traitName) {
+        return traitName;
+    }
+
+    public static String getPropagatedEdgeLabel(String classificationName) {
+        return "propagated:" + classificationName;
+    }
+
+    public static List<String> getAllTraitNames(AtlasVertex<?, ?> 
entityVertex) {
+        ArrayList<String> ret = new ArrayList<>();
+
+        if (entityVertex != null) {
+            Collection<String> traitNames = 
entityVertex.getPropertyValues(TRAIT_NAMES_PROPERTY_KEY, String.class);
+
+            if (CollectionUtils.isNotEmpty(traitNames)) {
+                ret.addAll(traitNames);
+            }
+
+            traitNames = 
entityVertex.getPropertyValues(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, 
String.class);
+
+            if (CollectionUtils.isNotEmpty(traitNames)) {
+                ret.addAll(traitNames);
+            }
+        }
+
+        return ret;
+    }
+
     public static List<String> getTraitNames(AtlasVertex<?,?> entityVertex) {
-        ArrayList<String> traits = new ArrayList<>();
-        Collection<String> propertyValues = 
entityVertex.getPropertyValues(Constants.TRAIT_NAMES_PROPERTY_KEY, 
String.class);
-        for(String value : propertyValues) {
-            traits.add(value);
+        ArrayList<String> ret = new ArrayList<>();
+
+        if (entityVertex != null) {
+            Collection<String> traitNames = 
entityVertex.getPropertyValues(TRAIT_NAMES_PROPERTY_KEY, String.class);
+
+            if (CollectionUtils.isNotEmpty(traitNames)) {
+                ret.addAll(traitNames);
+            }
         }
-        return traits;
+
+        return ret;
+    }
+
+    public static List<String> getPropagatedTraitNames(AtlasVertex<?,?> 
entityVertex) {
+        ArrayList<String> ret = new ArrayList<>();
+
+        if (entityVertex != null) {
+            Collection<String> traitNames = 
entityVertex.getPropertyValues(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, 
String.class);
+
+            if (CollectionUtils.isNotEmpty(traitNames)) {
+                ret.addAll(traitNames);
+            }
+        }
+
+        return ret;
     }
 
     public static List<String> getSuperTypeNames(AtlasVertex<?,?> 
entityVertex) {
@@ -691,6 +857,10 @@ public final class GraphHelper {
         return getIdFromVertex(getTypeName(vertex), vertex);
     }
 
+    public static String getRelationshipGuid(AtlasElement element) {
+        return element.getProperty(Constants.RELATIONSHIP_GUID_PROPERTY_KEY, 
String.class);
+    }
+
     public static String getGuid(AtlasElement element) {
         return element.<String>getProperty(Constants.GUID_PROPERTY_KEY, 
String.class);
     }
@@ -743,6 +913,25 @@ public final class GraphHelper {
         return 
element.getProperty(Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class);
     }
 
+    public List<AtlasVertex> 
getPropagatedEntityVerticesFromClassification(AtlasVertex classificationVertex) 
{
+        List<AtlasVertex> ret = new ArrayList<>();
+
+        if (classificationVertex != null) {
+            String              classificationName = 
getTypeName(classificationVertex);
+            Iterator<AtlasEdge> iterator           = 
getIncomingEdgesByLabel(classificationVertex, 
getPropagatedEdgeLabel(classificationName));
+
+            while (iterator != null && iterator.hasNext()) {
+                AtlasEdge propagatedEdge = iterator.next();
+
+                if (propagatedEdge != null) {
+                    ret.add(propagatedEdge.getOutVertex());
+                }
+            }
+        }
+
+        return ret;
+    }
+
     /**
      * For the given type, finds an unique attribute and checks if there is an 
existing instance with the same
      * unique value
@@ -881,6 +1070,41 @@ public final class GraphHelper {
         return Collections.emptyList();
     }
 
+    public static List<String> getTypeNames(List<AtlasVertex> vertices) {
+        List<String> ret = new ArrayList<>();
+
+        if (CollectionUtils.isNotEmpty(vertices)) {
+            for (AtlasVertex vertex : vertices) {
+                String entityTypeProperty = 
vertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class);
+
+                if (entityTypeProperty != null) {
+                    ret.add(getTypeName(vertex));
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    public static AtlasVertex getAssociatedEntityVertex(AtlasVertex 
classificationVertex) {
+        AtlasVertex ret = null;
+
+        if (classificationVertex != null) {
+            Iterator<AtlasEdge> iterator = 
getIncomingEdgesByLabel(classificationVertex, 
getTypeName(classificationVertex));
+
+            while (iterator != null && iterator.hasNext()) {
+                AtlasEdge edge = iterator.next();
+
+                if (edge != null) {
+                    ret = edge.getOutVertex();
+                    break;
+                }
+            }
+        }
+
+        return ret;
+    }
+
     /**
      * Guid and AtlasVertex combo
      */
@@ -919,7 +1143,8 @@ public final class GraphHelper {
         }
     }
 
-    /**
+    /*
+     /**
      * Get the GUIDs and vertices for all composite entities owned/contained 
by the specified root entity AtlasVertex.
      * The graph is traversed from the root entity through to the leaf nodes 
of the containment graph.
      *
@@ -1299,4 +1524,51 @@ public final class GraphHelper {
 
         return StringUtils.isNotEmpty(edge.getLabel()) ? 
edgeLabel.startsWith("r:") : false;
     }
+
+    public static AtlasObjectId getReferenceObjectId(AtlasEdge edge, 
AtlasRelationshipEdgeDirection relationshipDirection,
+                                                     AtlasVertex parentVertex) 
{
+        AtlasObjectId ret = null;
+
+        if (relationshipDirection == OUT) {
+            ret = getAtlasObjectIdForInVertex(edge);
+        } else if (relationshipDirection == IN) {
+            ret = getAtlasObjectIdForOutVertex(edge);
+        } else if (relationshipDirection == BOTH){
+            // since relationship direction is BOTH, edge direction can be 
inward or outward
+            // compare with parent entity vertex and pick the right reference 
vertex
+            if (verticesEquals(parentVertex, edge.getOutVertex())) {
+                ret = getAtlasObjectIdForInVertex(edge);
+            } else {
+                ret = getAtlasObjectIdForOutVertex(edge);
+            }
+        }
+
+        return ret;
+    }
+
+    public static AtlasObjectId getAtlasObjectIdForOutVertex(AtlasEdge edge) {
+        return new AtlasObjectId(getGuid(edge.getOutVertex()), 
getTypeName(edge.getOutVertex()));
+    }
+
+    public static AtlasObjectId getAtlasObjectIdForInVertex(AtlasEdge edge) {
+        return new AtlasObjectId(getGuid(edge.getInVertex()), 
getTypeName(edge.getInVertex()));
+    }
+
+    private static boolean verticesEquals(AtlasVertex vertexA, AtlasVertex 
vertexB) {
+        return StringUtils.equals(getGuid(vertexB), getGuid(vertexA));
+    }
+
+    public static void removePropagatedTraitNameFromVertex(AtlasVertex 
entityVertex, String propagatedTraitName) {
+        List<String> propagatedTraitNames = 
getPropagatedTraitNames(entityVertex);
+
+        if (CollectionUtils.isNotEmpty(propagatedTraitNames) && 
propagatedTraitNames.contains(propagatedTraitName)) {
+            propagatedTraitNames.remove(propagatedTraitName);
+
+            entityVertex.removeProperty(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY);
+
+            for (String pTraitName : propagatedTraitNames) {
+                GraphHelper.addListProperty(entityVertex, 
PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, pTraitName);
+            }
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
index 2b6bead..b9945d4 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
@@ -20,13 +20,14 @@ package org.apache.atlas.repository.store.graph.v1;
 
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.AtlasException;
-import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.listener.EntityChangeListener;
+import org.apache.atlas.listener.EntityChangeListenerV2;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasEntityHeader;
+
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
 import org.apache.atlas.v1.model.instance.Referenceable;
@@ -49,22 +50,26 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+import static 
org.apache.atlas.util.AtlasRepositoryConfiguration.isV2EntityNotificationEnabled;
+
 
 @Component
 public class AtlasEntityChangeNotifier {
     private static final Logger LOG = 
LoggerFactory.getLogger(AtlasEntityChangeNotifier.class);
 
-    private final Set<EntityChangeListener> entityChangeListeners;
-    private final AtlasInstanceConverter    instanceConverter;
+    private final Set<EntityChangeListener>   entityChangeListeners;
+    private final Set<EntityChangeListenerV2> entityChangeListenersV2;
+    private final AtlasInstanceConverter      instanceConverter;
 
     @Inject
     private FullTextMapperV2 fullTextMapperV2;
 
     @Inject
-    public AtlasEntityChangeNotifier(Set<EntityChangeListener> 
entityChangeListeners,
-                                     AtlasInstanceConverter    
instanceConverter) {
-        this.entityChangeListeners = entityChangeListeners;
-        this.instanceConverter     = instanceConverter;
+    public AtlasEntityChangeNotifier(Set<EntityChangeListener> 
entityChangeListeners, Set<EntityChangeListenerV2> entityChangeListenersV2,
+                                     AtlasInstanceConverter instanceConverter) 
{
+        this.entityChangeListeners   = entityChangeListeners;
+        this.entityChangeListenersV2 = entityChangeListenersV2;
+        this.instanceConverter       = instanceConverter;
     }
 
     public void onEntitiesMutated(EntityMutationResponse 
entityMutationResponse, boolean isImport) throws AtlasBaseException {
@@ -89,63 +94,84 @@ public class AtlasEntityChangeNotifier {
         notifyListeners(deletedEntities, EntityOperation.DELETE, isImport);
     }
 
-    public void onClassificationAddedToEntity(String entityId, 
List<AtlasClassification> classifications) throws AtlasBaseException {
-        // Only new classifications need to be used for a partial full text 
string which can be
-        // appended to the existing fullText
-        updateFullTextMapping(entityId, classifications);
+    public void onClassificationAddedToEntity(AtlasEntity entity, 
List<AtlasClassification> addedClassifications) throws AtlasBaseException {
+        if (isV2EntityNotificationEnabled()) {
+            doFullTextMapping(entity.getGuid());
 
-        Referenceable entity = toReferenceable(entityId);
-        List<Struct>  traits = toStruct(classifications);
+            for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
+                listener.onClassificationsAdded(entity, addedClassifications);
+            }
+        } else {
+            updateFullTextMapping(entity.getGuid(), addedClassifications);
 
-        if (entity == null || CollectionUtils.isEmpty(traits)) {
-            return;
-        }
+            Referenceable entityRef = toReferenceable(entity.getGuid());
+            List<Struct>  traits    = toStruct(addedClassifications);
 
-        for (EntityChangeListener listener : entityChangeListeners) {
-            try {
-                listener.onTraitsAdded(entity, traits);
-            } catch (AtlasException e) {
-                throw new 
AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, 
getListenerName(listener), "TraitAdd");
+            if (entity == null || CollectionUtils.isEmpty(traits)) {
+                return;
+            }
+
+            for (EntityChangeListener listener : entityChangeListeners) {
+                try {
+                    listener.onTraitsAdded(entityRef, traits);
+                } catch (AtlasException e) {
+                    throw new 
AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, 
getListenerName(listener), "TraitAdd");
+                }
             }
         }
     }
 
-    public void onClassificationDeletedFromEntity(String entityId, 
List<String> traitNames) throws AtlasBaseException {
-        // Since the entity has already been modified in the graph, we need to 
recursively remap the entity
-        doFullTextMapping(entityId);
+    public void onClassificationUpdatedToEntity(AtlasEntity entity, 
List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
+        if (isV2EntityNotificationEnabled()) {
+            doFullTextMapping(entity.getGuid());
 
-        Referenceable entity = toReferenceable(entityId);
+            for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
+                listener.onClassificationsUpdated(entity, 
updatedClassifications);
+            }
+        } else {
+            doFullTextMapping(entity.getGuid());
 
-        if (entity == null || CollectionUtils.isEmpty(traitNames)) {
-            return;
-        }
+            Referenceable entityRef = toReferenceable(entity.getGuid());
+            List<Struct>  traits    = toStruct(updatedClassifications);
 
-        for (EntityChangeListener listener : entityChangeListeners) {
-            try {
-                listener.onTraitsDeleted(entity, traitNames);
-            } catch (AtlasException e) {
-                throw new 
AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, 
getListenerName(listener), "TraitDelete");
+            if (entityRef == null || CollectionUtils.isEmpty(traits)) {
+                return;
+            }
+
+            for (EntityChangeListener listener : entityChangeListeners) {
+                try {
+                    listener.onTraitsUpdated(entityRef, traits);
+                } catch (AtlasException e) {
+                    throw new 
AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, 
getListenerName(listener), "TraitUpdate");
+                }
             }
         }
     }
 
-    public void onClassificationUpdatedToEntity(String entityId, 
List<AtlasClassification> classifications) throws AtlasBaseException {
-        // Since the classification attributes are updated in the graph, we 
need to recursively remap the entityText
-        doFullTextMapping(entityId);
+    public void onClassificationDeletedFromEntity(AtlasEntity entity, 
List<String> deletedClassificationNames) throws AtlasBaseException {
+        if (isV2EntityNotificationEnabled()) {
+            doFullTextMapping(entity.getGuid());
+
+            for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
+                listener.onClassificationsDeleted(entity, 
deletedClassificationNames);
+            }
+        } else {
+            doFullTextMapping(entity.getGuid());
 
-        Referenceable entity = toReferenceable(entityId);
-        List<Struct>  traits = toStruct(classifications);
+            Referenceable entityRef = toReferenceable(entity.getGuid());
 
-        if (entity == null || CollectionUtils.isEmpty(traits)) {
-            return;
-        }
+            if (entityRef == null || 
CollectionUtils.isEmpty(deletedClassificationNames)) {
+                return;
+            }
 
-        for (EntityChangeListener listener : entityChangeListeners) {
-            try {
-                listener.onTraitsUpdated(entity, traits);
-            } catch (AtlasException e) {
-                throw new 
AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, 
getListenerName(listener), "TraitUpdate");
+            for (EntityChangeListener listener : entityChangeListeners) {
+                try {
+                    listener.onTraitsDeleted(entityRef, 
deletedClassificationNames);
+                } catch (AtlasException e) {
+                    throw new 
AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, 
getListenerName(listener), "TraitDelete");
+                }
             }
+
         }
     }
 
@@ -158,6 +184,14 @@ public class AtlasEntityChangeNotifier {
             return;
         }
 
+        if (isV2EntityNotificationEnabled()) {
+            notifyV2Listeners(entityHeaders, operation, isImport);
+        } else {
+            notifyV1Listeners(entityHeaders, operation, isImport);
+        }
+    }
+
+    private void notifyV1Listeners(List<AtlasEntityHeader> entityHeaders, 
EntityOperation operation, boolean isImport) throws AtlasBaseException {
         List<Referenceable> typedRefInsts = toReferenceables(entityHeaders, 
operation);
 
         for (EntityChangeListener listener : entityChangeListeners) {
@@ -180,7 +214,26 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
-    private List<Referenceable> toReferenceables(List<AtlasEntityHeader> 
entityHeaders, EntityOperation operation) throws AtlasBaseException {
+    private void notifyV2Listeners(List<AtlasEntityHeader> entityHeaders, 
EntityOperation operation, boolean isImport) throws AtlasBaseException {
+        List<AtlasEntity> entities = toAtlasEntities(entityHeaders);
+
+        for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
+            switch (operation) {
+                case CREATE:
+                    listener.onEntitiesAdded(entities, isImport);
+                    break;
+                case UPDATE:
+                case PARTIAL_UPDATE:
+                    listener.onEntitiesUpdated(entities, isImport);
+                    break;
+                case DELETE:
+                    listener.onEntitiesDeleted(entities, isImport);
+                    break;
+            }
+        }
+    }
+
+        private List<Referenceable> toReferenceables(List<AtlasEntityHeader> 
entityHeaders, EntityOperation operation) throws AtlasBaseException {
         List<Referenceable> ret = new ArrayList<>(entityHeaders.size());
 
         // delete notifications don't need all attributes. Hence the special 
handling for delete operation
@@ -207,7 +260,7 @@ public class AtlasEntityChangeNotifier {
         return ret;
     }
 
-    private List<Struct> toStruct(List<AtlasClassification> classifications) 
throws AtlasBaseException {
+        private List<Struct> toStruct(List<AtlasClassification> 
classifications) throws AtlasBaseException {
         List<Struct> ret = null;
 
         if (classifications != null) {
@@ -223,6 +276,23 @@ public class AtlasEntityChangeNotifier {
         return ret;
     }
 
+    private List<AtlasEntity> toAtlasEntities(List<AtlasEntityHeader> 
entityHeaders) throws AtlasBaseException {
+        List<AtlasEntity> ret = new ArrayList<>();
+
+        if (CollectionUtils.isNotEmpty(entityHeaders)) {
+            for (AtlasEntityHeader entityHeader : entityHeaders) {
+                String                 entityGuid        = 
entityHeader.getGuid();
+                AtlasEntityWithExtInfo entityWithExtInfo = 
instanceConverter.getAndCacheEntity(entityGuid);
+
+                if (entityWithExtInfo != null) {
+                    ret.add(entityWithExtInfo.getEntity());
+                }
+            }
+        }
+
+        return ret;
+    }
+
     private void doFullTextMapping(List<AtlasEntityHeader> entityHeaders) {
         if (CollectionUtils.isEmpty(entityHeaders)) {
             return;
@@ -293,4 +363,4 @@ public class AtlasEntityChangeNotifier {
 
         doFullTextMapping(Collections.singletonList(entityHeader));
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index ca0eeeb..bf417dd 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -462,54 +462,26 @@ public class AtlasEntityStoreV1 implements 
AtlasEntityStore {
         validateEntityAssociations(guid, classifications);
 
         entityGraphMapper.addClassifications(new EntityMutationContext(), 
guid, classifications);
-
-        // notify listeners on classification addition
-        entityChangeNotifier.onClassificationAddedToEntity(guid, 
classifications);
     }
 
     @Override
     @GraphTransaction
-    public void updateClassifications(String guid, List<AtlasClassification> 
newClassifications) throws AtlasBaseException {
+    public void updateClassifications(String guid, List<AtlasClassification> 
classifications) throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Updating classifications={} for entity={}", 
newClassifications, guid);
+            LOG.debug("Updating classifications={} for entity={}", 
classifications, guid);
         }
 
         if (StringUtils.isEmpty(guid)) {
             throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, 
"Guid not specified");
         }
 
-        if (CollectionUtils.isEmpty(newClassifications)) {
+        if (CollectionUtils.isEmpty(classifications)) {
             throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, 
"classifications(s) not specified");
         }
 
         GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
-        List<AtlasClassification> updatedClassifications = new ArrayList<>();
-
-        for (AtlasClassification newClassification : newClassifications) {
-            String              classificationName = 
newClassification.getTypeName();
-            AtlasClassification oldClassification  = getClassification(guid, 
classificationName);
-
-            if (oldClassification == null) {
-                throw new 
AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName);
-            }
 
-            validateAndNormalizeForUpdate(newClassification);
-
-            Map<String, Object> newAttrs = newClassification.getAttributes();
-
-            if (MapUtils.isNotEmpty(newAttrs)) {
-                for (String attrName : newAttrs.keySet()) {
-                    oldClassification.setAttribute(attrName, 
newAttrs.get(attrName));
-                }
-            }
-
-            entityGraphMapper.updateClassification(new 
EntityMutationContext(), guid, oldClassification);
-
-            updatedClassifications.add(oldClassification);
-        }
-
-        // notify listeners on update to classifications
-        entityChangeNotifier.onClassificationUpdatedToEntity(guid, 
updatedClassifications);
+        entityGraphMapper.updateClassifications(new EntityMutationContext(), 
guid, classifications);
     }
 
     @Override
@@ -533,15 +505,10 @@ public class AtlasEntityStoreV1 implements 
AtlasEntityStore {
         List<AtlasClassification> classifications = 
Collections.singletonList(classification);
 
         for (String guid : guids) {
-            // validate if entity, not already associated with classifications
             validateEntityAssociations(guid, classifications);
 
             entityGraphMapper.addClassifications(new EntityMutationContext(), 
guid, classifications);
-
-            // notify listeners on classification addition
-            entityChangeNotifier.onClassificationAddedToEntity(guid, 
classifications);
         }
-
     }
 
     @Override
@@ -561,16 +528,13 @@ public class AtlasEntityStoreV1 implements 
AtlasEntityStore {
         GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
 
         entityGraphMapper.deleteClassifications(guid, classificationNames);
-
-        // notify listeners on classification deletion
-        entityChangeNotifier.onClassificationDeletedFromEntity(guid, 
classificationNames);
     }
 
     @Override
     @GraphTransaction
     public List<AtlasClassification> getClassifications(String guid) throws 
AtlasBaseException {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Getting classifications for entities={}", guid);
+            LOG.debug("Getting classifications for entity={}", guid);
         }
 
         EntityGraphRetriever graphRetriever = new 
EntityGraphRetriever(typeRegistry);
@@ -680,24 +644,6 @@ public class AtlasEntityStoreV1 implements 
AtlasEntityStore {
         type.getNormalizedValue(classification);
     }
 
-    private void validateAndNormalizeForUpdate(AtlasClassification 
classification) throws AtlasBaseException {
-        AtlasClassificationType type = 
typeRegistry.getClassificationTypeByName(classification.getTypeName());
-
-        if (type == null) {
-            throw new 
AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, 
classification.getTypeName());
-        }
-
-        List<String> messages = new ArrayList<>();
-
-        type.validateValueForUpdate(classification, 
classification.getTypeName(), messages);
-
-        if (!messages.isEmpty()) {
-            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, 
messages);
-        }
-
-        type.getNormalizedValueForUpdate(classification);
-    }
-
     /**
      * Validate if classification is not already associated with the entities
      *
@@ -734,7 +680,11 @@ public class AtlasEntityStoreV1 implements 
AtlasEntityStore {
             ret = new ArrayList<>();
 
             for (AtlasClassification classification : classifications) {
-                ret.add(classification.getTypeName());
+                String entityGuid = classification.getEntityGuid();
+
+                if (StringUtils.isEmpty(entityGuid) || 
StringUtils.equalsIgnoreCase(guid, entityGuid)) {
+                    ret.add(classification.getTypeName());
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java
index 7389f49..28636d8 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java
@@ -21,6 +21,7 @@ import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
+import org.apache.atlas.model.instance.AtlasEntity.Status;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasRelationship;
 import org.apache.atlas.model.typedef.AtlasRelationshipDef;
@@ -30,6 +31,7 @@ import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.RepositoryException;
 import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
 import org.apache.atlas.type.AtlasEntityType;
@@ -54,10 +56,16 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 
+import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
 import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
+import static 
org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.NONE;
+import static 
org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.BOTH;
 import static 
org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO;
 import static 
org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.TWO_TO_ONE;
-import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.BOTH;
+import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
+import static 
org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getIdFromVertex;
 import static 
org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getState;
 import static 
org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getTypeName;
 
@@ -117,8 +125,8 @@ public class AtlasRelationshipStoreV1 implements 
AtlasRelationshipStore {
 
         AtlasEdge   edge       = graphHelper.getEdgeForGUID(guid);
         String      edgeType   = AtlasGraphUtilsV1.getTypeName(edge);
-        AtlasVertex end1Vertex = edge.getInVertex();
-        AtlasVertex end2Vertex = edge.getOutVertex();
+        AtlasVertex end1Vertex = edge.getOutVertex();
+        AtlasVertex end2Vertex = edge.getInVertex();
 
         // update shouldn't change endType
         if (StringUtils.isNotEmpty(relationship.getTypeName()) && 
!StringUtils.equalsIgnoreCase(edgeType, relationship.getTypeName())) {
@@ -302,6 +310,8 @@ public class AtlasRelationshipStoreV1 implements 
AtlasRelationshipStore {
     private AtlasRelationship updateRelationship(AtlasEdge relationshipEdge, 
AtlasRelationship relationship) throws AtlasBaseException {
         AtlasRelationshipType relationType = 
typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
 
+        updateTagPropagations(relationshipEdge, 
relationship.getPropagateTags());
+
         AtlasGraphUtilsV1.setProperty(relationshipEdge, 
Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, 
relationship.getPropagateTags().name());
 
         if (MapUtils.isNotEmpty(relationType.getAllAttributes())) {
@@ -318,6 +328,46 @@ public class AtlasRelationshipStoreV1 implements 
AtlasRelationshipStore {
         return entityRetriever.mapEdgeToAtlasRelationship(relationshipEdge);
     }
 
+    private void updateTagPropagations(AtlasEdge relationshipEdge, 
PropagateTags tagPropagation) throws AtlasBaseException {
+        PropagateTags oldTagPropagation = getPropagateTags(relationshipEdge);
+        PropagateTags newTagPropagation = tagPropagation;
+
+        if (newTagPropagation != oldTagPropagation) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Updating tagPropagation property: [ {} -> {} ] for 
relationship: [{} --> {}]", oldTagPropagation.name(),
+                          newTagPropagation.name(), 
getTypeName(relationshipEdge.getOutVertex()), 
getTypeName(relationshipEdge.getInVertex()));
+            }
+
+            if (oldTagPropagation == NONE) {
+                entityRetriever.addTagPropagation(relationshipEdge, 
newTagPropagation);
+            } else if (oldTagPropagation == ONE_TO_TWO) {
+                if (newTagPropagation == NONE || newTagPropagation == 
TWO_TO_ONE) {
+                    entityRetriever.removeTagPropagation(relationshipEdge, 
oldTagPropagation);
+                }
+
+                if (newTagPropagation != NONE) {
+                    entityRetriever.addTagPropagation(relationshipEdge, 
newTagPropagation);
+                }
+            } else if (oldTagPropagation == TWO_TO_ONE) {
+                if (newTagPropagation == NONE || newTagPropagation == 
ONE_TO_TWO) {
+                    entityRetriever.removeTagPropagation(relationshipEdge, 
oldTagPropagation);
+                }
+
+                if (newTagPropagation != NONE) {
+                    entityRetriever.addTagPropagation(relationshipEdge, 
newTagPropagation);
+                }
+            } else if (oldTagPropagation == BOTH) {
+                if (newTagPropagation == ONE_TO_TWO || newTagPropagation == 
NONE) {
+                    entityRetriever.removeTagPropagation(relationshipEdge, 
TWO_TO_ONE);
+                }
+
+                if (newTagPropagation == TWO_TO_ONE || newTagPropagation == 
NONE) {
+                    entityRetriever.removeTagPropagation(relationshipEdge, 
ONE_TO_TWO);
+                }
+            }
+        }
+    }
+
     private void validateRelationship(AtlasRelationship relationship) throws 
AtlasBaseException {
         if (relationship == null) {
             throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, 
"AtlasRelationship is null");
@@ -462,16 +512,20 @@ public class AtlasRelationshipStoreV1 implements 
AtlasRelationshipStore {
     }
 
     public AtlasEdge getRelationshipEdge(AtlasVertex fromVertex, AtlasVertex 
toVertex, String relationshipType) {
-        String    relationshipLabel = getRelationshipEdgeLabel(fromVertex, 
toVertex, relationshipType);
-        AtlasEdge ret               = graphHelper.getEdgeForLabel(fromVertex, 
relationshipLabel);
+        String              relationshipLabel = 
getRelationshipEdgeLabel(fromVertex, toVertex, relationshipType);
+        Iterator<AtlasEdge> edgesIterator     = 
getOutGoingEdgesByLabel(fromVertex, relationshipLabel);
+        AtlasEdge           ret               = null;
 
-        if (ret != null) {
-            AtlasVertex inVertex = ret.getInVertex();
+        while (edgesIterator != null && edgesIterator.hasNext()) {
+            AtlasEdge edge = edgesIterator.next();
 
-            if (inVertex != null) {
-                if 
(!StringUtils.equals(AtlasGraphUtilsV1.getIdFromVertex(inVertex),
-                                        
AtlasGraphUtilsV1.getIdFromVertex(toVertex))) {
-                    ret = null;
+            if (edge != null) {
+                Status status = graphHelper.getStatus(edge);
+
+                if ((status == null || status == ACTIVE) &&
+                        
StringUtils.equals(getIdFromVertex(edge.getInVertex()), 
getIdFromVertex(toVertex))) {
+                    ret = edge;
+                    break;
                 }
             }
         }
@@ -499,11 +553,15 @@ public class AtlasRelationshipStoreV1 implements 
AtlasRelationshipStore {
         return ret;
     }
 
-    private AtlasEdge createRelationshipEdge(AtlasVertex fromVertex, 
AtlasVertex toVertex, AtlasRelationship relationship) throws 
RepositoryException {
+    private AtlasEdge createRelationshipEdge(AtlasVertex fromVertex, 
AtlasVertex toVertex, AtlasRelationship relationship) throws 
RepositoryException, AtlasBaseException {
         String        relationshipLabel = getRelationshipEdgeLabel(fromVertex, 
toVertex, relationship.getTypeName());
         PropagateTags tagPropagation    = 
getRelationshipTagPropagation(fromVertex, toVertex, relationship);
         AtlasEdge     ret               = 
graphHelper.getOrCreateEdge(fromVertex, toVertex, relationshipLabel);
 
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Created relationship edge from [{}] --> [{}] using edge 
label: [{}]", getTypeName(fromVertex), getTypeName(toVertex), 
relationshipLabel);
+        }
+
         // map additional properties to relationship edge
         if (ret != null) {
             final String guid = UUID.randomUUID().toString();
@@ -512,6 +570,9 @@ public class AtlasRelationshipStoreV1 implements 
AtlasRelationshipStore {
             AtlasGraphUtilsV1.setProperty(ret, 
Constants.RELATIONSHIP_GUID_PROPERTY_KEY, guid);
             AtlasGraphUtilsV1.setProperty(ret, Constants.VERSION_PROPERTY_KEY, 
getRelationshipVersion(relationship));
             AtlasGraphUtilsV1.setProperty(ret, 
Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, tagPropagation.name());
+
+            // propagate tags
+            entityRetriever.addTagPropagation(ret, tagPropagation);
         }
 
         return ret;
@@ -596,7 +657,7 @@ public class AtlasRelationshipStoreV1 implements 
AtlasRelationshipStore {
      */
     private boolean vertexHasRelationshipWithType(AtlasVertex vertex, String 
relationshipTypeName) {
         String relationshipEdgeLabel = 
getRelationshipEdgeLabel(getTypeName(vertex), relationshipTypeName);
-        Iterator<AtlasEdge> iter     = 
graphHelper.getAdjacentEdgesByLabel(vertex, BOTH, relationshipEdgeLabel);
+        Iterator<AtlasEdge> iter     = 
graphHelper.getAdjacentEdgesByLabel(vertex, AtlasEdgeDirection.BOTH, 
relationshipEdgeLabel);
 
         return (iter != null) ? iter.hasNext() : false;
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
index 779bc38..0224bf0 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
@@ -25,6 +25,7 @@ import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasRelatedObjectId;
@@ -36,6 +37,7 @@ import 
org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
 import 
org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality;
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.RepositoryException;
+import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
@@ -51,6 +53,7 @@ import 
org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdg
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.util.AtlasGremlinQueryProvider;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.IteratorUtils;
 import org.apache.commons.collections.MapUtils;
@@ -63,14 +66,24 @@ import javax.inject.Inject;
 import java.util.*;
 import java.util.stream.Collectors;
 
+import static org.apache.atlas.model.TypeCategory.CLASSIFICATION;
 import static 
org.apache.atlas.model.instance.AtlasRelatedObjectId.KEY_RELATIONSHIP_ATTRIBUTES;
 import static 
org.apache.atlas.model.instance.EntityMutations.EntityOperation.CREATE;
 import static 
org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
 import static 
org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
 import static 
org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
 import static 
org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET;
+import static 
org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
 import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY;
+import static org.apache.atlas.repository.graph.GraphHelper.addListProperty;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getIncomingEdgesByLabel;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getPropagatedTraitNames;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getPropagatedEdgeLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getTraitLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames;
 import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
+import static org.apache.atlas.repository.graph.GraphHelper.getTypeNames;
 import static org.apache.atlas.repository.graph.GraphHelper.isRelationshipEdge;
 import static org.apache.atlas.repository.graph.GraphHelper.string;
 import static 
org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getIdFromVertex;
@@ -81,21 +94,26 @@ import static 
org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation
 public class EntityGraphMapper {
     private static final Logger LOG = 
LoggerFactory.getLogger(EntityGraphMapper.class);
 
-    private final GraphHelper            graphHelper = 
GraphHelper.getInstance();
-    private final AtlasGraph             graph;
-    private final DeleteHandlerV1        deleteHandler;
-    private final AtlasTypeRegistry      typeRegistry;
-    private final EntityGraphRetriever   entityRetriever;
-    private final AtlasRelationshipStore relationshipStore;
+    private final GraphHelper               graphHelper = 
GraphHelper.getInstance();
+    private final AtlasGraph                graph;
+    private final DeleteHandlerV1           deleteHandler;
+    private final AtlasTypeRegistry         typeRegistry;
+    private final AtlasRelationshipStore    relationshipStore;
+    private final AtlasEntityChangeNotifier entityChangeNotifier;
+    private final AtlasInstanceConverter    instanceConverter;
+    private final EntityGraphRetriever      entityRetriever;
 
     @Inject
     public EntityGraphMapper(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry 
typeRegistry, AtlasGraph atlasGraph,
-                             AtlasRelationshipStore relationshipStore) {
-        this.deleteHandler     = deleteHandler;
-        this.typeRegistry      = typeRegistry;
-        this.entityRetriever   = new EntityGraphRetriever(typeRegistry);
-        this.graph             = atlasGraph;
-        this.relationshipStore = relationshipStore;
+                             AtlasRelationshipStore relationshipStore, 
AtlasEntityChangeNotifier entityChangeNotifier,
+                             AtlasInstanceConverter instanceConverter) {
+        this.deleteHandler        = deleteHandler;
+        this.typeRegistry         = typeRegistry;
+        this.graph                = atlasGraph;
+        this.relationshipStore    = relationshipStore;
+        this.entityChangeNotifier = entityChangeNotifier;
+        this.instanceConverter    = instanceConverter;
+        this.entityRetriever      = new EntityGraphRetriever(typeRegistry);
     }
 
     public AtlasVertex createVertex(AtlasEntity entity) {
@@ -241,6 +259,8 @@ public class EntityGraphMapper {
         AtlasVertex ret = createStructVertex(classification);
 
         AtlasGraphUtilsV1.addProperty(ret, Constants.SUPER_TYPES_PROPERTY_KEY, 
classificationType.getAllSuperTypes());
+        AtlasGraphUtilsV1.setProperty(ret, 
Constants.CLASSIFICATION_ENTITY_GUID, classification.getEntityGuid());
+        AtlasGraphUtilsV1.setProperty(ret, 
Constants.CLASSIFICATION_PROPAGATE_KEY, classification.isPropagate());
 
         return ret;
     }
@@ -903,7 +923,7 @@ public class EntityGraphMapper {
 
     private void updateModificationMetadata(AtlasVertex vertex) {
         AtlasGraphUtilsV1.setProperty(vertex, 
Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, 
RequestContextV1.get().getRequestTime());
-        GraphHelper.setProperty(vertex, Constants.MODIFIED_BY_KEY, 
RequestContextV1.get().getUser());
+        AtlasGraphUtilsV1.setProperty(vertex, Constants.MODIFIED_BY_KEY, 
RequestContextV1.get().getUser());
     }
 
     private Long getEntityVersion(AtlasEntity entity) {
@@ -1285,80 +1305,299 @@ public class EntityGraphMapper {
         }
     }
 
-    public void addClassifications(final EntityMutationContext context, String 
guid, List<AtlasClassification> classifications)
-        throws AtlasBaseException {
+    public void addClassifications(final EntityMutationContext context, String 
guid, List<AtlasClassification> classifications) throws AtlasBaseException {
+        if (CollectionUtils.isNotEmpty(classifications)) {
+            AtlasVertex entityVertex = AtlasGraphUtilsV1.findByGuid(guid);
 
-        if ( CollectionUtils.isNotEmpty(classifications)) {
-
-            AtlasVertex instanceVertex = AtlasGraphUtilsV1.findByGuid(guid);
-            if (instanceVertex == null) {
+            if (entityVertex == null) {
                 throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
             }
 
-            String entityTypeName = 
AtlasGraphUtilsV1.getTypeName(instanceVertex);
-
-            final AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(entityTypeName);
+            String                    entityTypeName            = 
AtlasGraphUtilsV1.getTypeName(entityVertex);
+            final AtlasEntityType     entityType                = 
typeRegistry.getEntityTypeByName(entityTypeName);
+            List<AtlasVertex>         propagatedEntityVertices  = null;
+            List<AtlasClassification> propagagedClassifications = null;
 
             for (AtlasClassification classification : classifications) {
+                String  classificationName = classification.getTypeName();
+                boolean propagateTags      = classification.isPropagate();
+
+                // set associated entity id to classification
+                classification.setEntityGuid(guid);
+
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("mapping classification {}", classification);
+                    LOG.debug("Adding classification [{}] to [{}] using edge 
label: [{}]", classificationName, entityTypeName, 
getTraitLabel(classificationName));
                 }
 
-                GraphHelper.addProperty(instanceVertex, 
Constants.TRAIT_NAMES_PROPERTY_KEY, classification.getTypeName());
+                GraphHelper.addProperty(entityVertex, 
TRAIT_NAMES_PROPERTY_KEY, classificationName);
+
                 // add a new AtlasVertex for the struct or trait instance
                 AtlasVertex classificationVertex = 
createClassificationVertex(classification);
+
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("created vertex {} for trait {}", 
string(classificationVertex), classification.getTypeName());
+                    LOG.debug("created vertex {} for trait {}", 
string(classificationVertex), classificationName);
                 }
 
                 // add the attributes for the trait instance
-                mapClassification(EntityOperation.CREATE, context, 
classification, entityType, instanceVertex, classificationVertex);
+                mapClassification(EntityOperation.CREATE, context, 
classification, entityType, entityVertex, classificationVertex);
+
+                if (propagateTags) {
+                    // compute propagatedEntityVertices only once
+                    if (propagatedEntityVertices == null) {
+                        propagatedEntityVertices = 
graphHelper.getImpactedVertices(guid);
+                    }
+
+                    if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Propagating tag: [{}][{}] to {}", 
classificationName, entityTypeName, getTypeNames(propagatedEntityVertices));
+                        }
+
+                        if (propagagedClassifications == null) {
+                            propagagedClassifications = new ArrayList<>();
+                        }
+
+                        propagagedClassifications.add(classification);
+
+                        addTagPropagation(classificationVertex, 
propagatedEntityVertices);
+                    } else {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(" --> Not propagating classification: 
[{}][{}] - no entities found to propagate to.", 
getTypeName(classificationVertex), entityTypeName);
+                        }
+                    }
+                } else {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(" --> Not propagating classification: 
[{}][{}] - propagation is disabled.", getTypeName(classificationVertex), 
entityTypeName);
+                    }
+                }
+            }
+
+            // notify listeners on classification addition
+            List<AtlasVertex> notificationVertices = new 
ArrayList<AtlasVertex>() {{ add(entityVertex); }};
+
+            if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) {
+                notificationVertices.addAll(propagatedEntityVertices);
+            }
+
+            for (AtlasVertex vertex : notificationVertices) {
+                String                    entityGuid           = 
GraphHelper.getGuid(vertex);
+                AtlasEntityWithExtInfo    entityWithExtInfo    = 
instanceConverter.getAndCacheEntity(entityGuid);
+                AtlasEntity               entity               = 
(entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
+                List<AtlasClassification> addedClassifications = 
StringUtils.equals(entityGuid, guid) ? classifications : 
Collections.emptyList();
+
+                entityChangeNotifier.onClassificationAddedToEntity(entity, 
addedClassifications);
             }
         }
     }
 
-    public void updateClassification(final EntityMutationContext context, 
String guid, AtlasClassification classification)
-                                     throws AtlasBaseException {
+    public void deleteClassifications(String guid, List<String> 
classificationNames) throws AtlasBaseException {
+        if (CollectionUtils.isNotEmpty(classificationNames)) {
+            AtlasVertex entityVertex = AtlasGraphUtilsV1.findByGuid(guid);
 
-        AtlasVertex instanceVertex = AtlasGraphUtilsV1.findByGuid(guid);
+            if (entityVertex == null) {
+                throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
+            }
 
-        if (instanceVertex == null) {
-            throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
-        }
+            List<String> traitNames = getTraitNames(entityVertex);
 
-        String entityTypeName = AtlasGraphUtilsV1.getTypeName(instanceVertex);
+            validateClassificationExists(traitNames, classificationNames);
 
-        final AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(entityTypeName);
+            Set<String> impactedEntities = new HashSet<String>() {{ add(guid); 
}};
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Updating classification {} for entity {}", 
classification, guid);
+            for (String classificationName : classificationNames) {
+                AtlasEdge   classificationEdge   = 
graphHelper.getEdgeForLabel(entityVertex, getTraitLabel(classificationName));
+                AtlasVertex classificationVertex = 
classificationEdge.getInVertex();
+
+                // remove classification from propagated entity vertices
+                boolean propagationEnabled = 
entityRetriever.isPropagationEnabled(classificationVertex);
+
+                if (propagationEnabled) {
+                    List<AtlasVertex> impactedEntityVertices = 
graphHelper.getPropagatedEntityVerticesFromClassification(classificationVertex);
+
+                    if (CollectionUtils.isNotEmpty(impactedEntityVertices)) {
+                        removeTagPropagation(classificationVertex);
+
+                        for (AtlasVertex impactedEntityVertex : 
impactedEntityVertices) {
+                            
impactedEntities.add(GraphHelper.getGuid(impactedEntityVertex));
+                        }
+                    }
+                }
+
+                // remove classification from associated entity vertex
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Removing classification: [{}] from: [{}][{}] 
with edge label: [{}]", classificationName, getTypeName(entityVertex), guid, 
getTraitLabel(classificationName));
+                }
+
+                deleteHandler.deleteEdgeReference(classificationEdge, 
CLASSIFICATION, false, true, entityVertex);
+
+                traitNames.remove(classificationName);
+            }
+
+            updateTraitNamesProperty(entityVertex, traitNames);
+
+            updateModificationMetadata(entityVertex);
+
+            for (String entityGuid : impactedEntities) {
+                AtlasEntityWithExtInfo    entityWithExtInfo          = 
instanceConverter.getAndCacheEntity(entityGuid);
+                AtlasEntity               entity                     = 
(entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
+                List<String>              deletedClassificationNames = 
StringUtils.equals(entityGuid, guid) ? classificationNames : 
Collections.emptyList();
+
+                entityChangeNotifier.onClassificationDeletedFromEntity(entity, 
deletedClassificationNames);
+            }
         }
+    }
+
+    public void updateClassifications(EntityMutationContext context, String 
guid, List<AtlasClassification> classifications) throws AtlasBaseException {
+        if (CollectionUtils.isNotEmpty(classifications)) {
+            AtlasVertex entityVertex = AtlasGraphUtilsV1.findByGuid(guid);
+
+            if (entityVertex == null) {
+                throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
+            }
+
+            String                    entityTypeName           = 
AtlasGraphUtilsV1.getTypeName(entityVertex);
+            AtlasEntityType           entityType               = 
typeRegistry.getEntityTypeByName(entityTypeName);
+            List<AtlasClassification> updatedClassifications   = new 
ArrayList<>();
+            List<AtlasVertex>         propagatedEntityVertices = new 
ArrayList<>();
+
+            for (AtlasClassification classification : classifications) {
+                String classificationName       = classification.getTypeName();
+                String classificationEntityGuid = 
classification.getEntityGuid();
+
+                if (StringUtils.isNotEmpty(classificationEntityGuid) && 
StringUtils.equalsIgnoreCase(guid, classificationEntityGuid)) {
+                    throw new 
AtlasBaseException(AtlasErrorCode.CLASSIFICATION_UPDATE_FROM_PROPAGATED_ENTITY, 
classificationName);
+                }
+
+                String    relationshipLabel  = getTraitLabel(entityTypeName, 
classificationName);
+                AtlasEdge classificationEdge = 
graphHelper.getEdgeForLabel(entityVertex, relationshipLabel);
+
+                if (classificationEdge == null) {
+                    throw new 
AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY, 
classificationName);
+                }
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Updating classification {} for entity {}", 
classification, guid);
+                }
+
+                AtlasVertex         classificationVertex  = 
classificationEdge.getInVertex();
+                AtlasClassification currentClassification = 
entityRetriever.toAtlasClassification(classificationVertex);
+
+                validateAndNormalizeForUpdate(classification);
+
+                Map<String, Object> classificationAttributes = 
classification.getAttributes();
+
+                if (MapUtils.isNotEmpty(classificationAttributes)) {
+                    for (String attributeName : 
classificationAttributes.keySet()) {
+                        currentClassification.setAttribute(attributeName, 
classificationAttributes.get(attributeName));
+                    }
+                }
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("updating vertex {} for trait {}", 
string(classificationVertex), classificationName);
+                }
+
+                mapClassification(EntityOperation.UPDATE, context, 
classification, entityType, entityVertex, classificationVertex);
+
+                // handle update of 'propagate' flag
+                boolean currentTagPropagation = 
currentClassification.isPropagate();
+                boolean updatedTagPropagation = classification.isPropagate();
+
+                // compute propagatedEntityVertices once and use it for 
subsequent iterations and notifications
+                if (CollectionUtils.isEmpty(propagatedEntityVertices)) {
+                    propagatedEntityVertices = (currentTagPropagation) ? 
graphHelper.getPropagatedEntityVerticesFromClassification(classificationVertex) 
:
+                            graphHelper.getImpactedVertices(guid);
+                }
+
+                if (currentTagPropagation != updatedTagPropagation) {
+                    if (updatedTagPropagation) {
+                        addTagPropagation(classificationVertex, 
propagatedEntityVertices);
+                    } else {
+                        removeTagPropagation(classificationVertex);
+                    }
 
-        // get the classification vertex from entity
-        String      relationshipLabel    = 
GraphHelper.getTraitLabel(entityTypeName, classification.getTypeName());
-        AtlasEdge   classificationEdge   = 
graphHelper.getEdgeForLabel(instanceVertex, relationshipLabel);
+                    AtlasGraphUtilsV1.setProperty(classificationVertex, 
Constants.CLASSIFICATION_PROPAGATE_KEY, updatedTagPropagation);
+                }
+
+                updatedClassifications.add(currentClassification);
+            }
 
-        if (classificationEdge == null) {
-            throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, 
"classificationEdge is null for label: " + relationshipLabel);
+            // notify listeners on classification update
+            List<AtlasVertex> notificationVertices = new 
ArrayList<AtlasVertex>() {{ add(entityVertex); }};
+
+            if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) {
+                notificationVertices.addAll(propagatedEntityVertices);
+            }
+
+            for (AtlasVertex vertex : notificationVertices) {
+                String                    entityGuid                = 
GraphHelper.getGuid(vertex);
+                AtlasEntityWithExtInfo    entityWithExtInfo         = 
instanceConverter.getAndCacheEntity(entityGuid);
+                AtlasEntity               entity                    = 
(entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
+                List<AtlasClassification> updatedClassificationList = 
StringUtils.equals(entityGuid, guid) ? updatedClassifications : 
Collections.emptyList();
+
+                entityChangeNotifier.onClassificationUpdatedToEntity(entity, 
updatedClassificationList);
+            }
         }
+    }
 
-        AtlasVertex classificationVertex = classificationEdge.getInVertex();
+    private void addTagPropagation(AtlasVertex classificationVertex, 
List<AtlasVertex> propagatedEntityVertices) {
+        if (CollectionUtils.isNotEmpty(propagatedEntityVertices) && 
classificationVertex != null) {
+            String                  classificationName = 
getTypeName(classificationVertex);
+            AtlasClassificationType classificationType = 
typeRegistry.getClassificationTypeByName(classificationName);
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("updating vertex {} for trait {}", 
string(classificationVertex), classification.getTypeName());
+            for (AtlasVertex propagatedEntityVertex : 
propagatedEntityVertices) {
+                String          entityTypeName = 
getTypeName(propagatedEntityVertex);
+                AtlasEntityType entityType     = 
typeRegistry.getEntityTypeByName(entityTypeName);
+
+                if (classificationType.canApplyToEntityType(entityType)) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(" --> Adding propagated classification: [{}] 
to {} ({}) using edge label: [{}]", classificationName, 
getTypeName(propagatedEntityVertex),
+                                GraphHelper.getGuid(propagatedEntityVertex), 
getPropagatedEdgeLabel(classificationName));
+                    }
+
+                    graphHelper.addEdge(propagatedEntityVertex, 
classificationVertex, getPropagatedEdgeLabel(classificationName));
+
+                    addListProperty(propagatedEntityVertex, 
PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, classificationName);
+                }
+            }
         }
+    }
 
-        mapClassification(EntityOperation.UPDATE, context, classification, 
entityType, instanceVertex, classificationVertex);
+    private void removeTagPropagation(AtlasVertex classificationVertex) throws 
AtlasBaseException {
+        if (classificationVertex != null) {
+            String              classificationName = 
getTypeName(classificationVertex);
+            Iterator<AtlasEdge> iterator           = 
getIncomingEdgesByLabel(classificationVertex, 
getPropagatedEdgeLabel(classificationName));
+
+            // remove classification from propagated entity vertices
+            while (iterator != null && iterator.hasNext()) {
+                AtlasEdge propagatedEdge = iterator.next();
+
+                if (propagatedEdge != null) {
+                    AtlasVertex propagatedEntityVertex = 
propagatedEdge.getOutVertex();
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Removing propagated classification: [{}] 
from: [{}][{}] with edge label: [{}]", classificationName,
+                                  getTypeName(propagatedEntityVertex), 
GraphHelper.getGuid(propagatedEntityVertex), 
getPropagatedEdgeLabel(classificationName));
+                    }
+
+                    removePropagatedTraitName(propagatedEntityVertex, 
classificationName);
+
+                    deleteHandler.deleteEdge(propagatedEdge, true);
+
+                    updateModificationMetadata(propagatedEntityVertex);
+                }
+            }
+        }
     }
 
-    private AtlasEdge mapClassification(EntityOperation operation,  final 
EntityMutationContext context, AtlasClassification classification, 
AtlasEntityType entityType, AtlasVertex parentInstanceVertex, AtlasVertex 
traitInstanceVertex)
-        throws AtlasBaseException {
+    private AtlasEdge mapClassification(EntityOperation operation,  final 
EntityMutationContext context, AtlasClassification classification,
+                                        AtlasEntityType entityType, 
AtlasVertex parentInstanceVertex, AtlasVertex traitInstanceVertex)
+                                        throws AtlasBaseException {
 
         // map all the attributes to this newly created AtlasVertex
         mapAttributes(classification, traitInstanceVertex, operation, context);
 
         // add an edge to the newly created AtlasVertex from the parent
-        String relationshipLabel = 
GraphHelper.getTraitLabel(entityType.getTypeName(), 
classification.getTypeName());
+        String relationshipLabel = getTraitLabel(entityType.getTypeName(), 
classification.getTypeName());
+
         try {
            return graphHelper.getOrCreateEdge(parentInstanceVertex, 
traitInstanceVertex, relationshipLabel);
         } catch (RepositoryException e) {
@@ -1373,54 +1612,40 @@ public class EntityGraphMapper {
             throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
         }
 
-        List<String> traitNames = GraphHelper.getTraitNames(instanceVertex);
+        List<String> traitNames = getTraitNames(instanceVertex);
 
         deleteClassifications(guid, traitNames);
     }
 
-    public void deleteClassifications(String guid, List<String> 
classificationNames) throws AtlasBaseException {
-
-        AtlasVertex instanceVertex = AtlasGraphUtilsV1.findByGuid(guid);
-        if (instanceVertex == null) {
-            throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
-        }
+    private void removePropagatedTraitName(AtlasVertex entityVertex, String 
classificationName) {
+        if (entityVertex != null && 
StringUtils.isNotEmpty(classificationName)) {
+            List<String> propagatedTraitNames = 
getPropagatedTraitNames(entityVertex);
 
-        List<String> traitNames = GraphHelper.getTraitNames(instanceVertex);
+            propagatedTraitNames.remove(classificationName);
 
-        validateClassificationExists(traitNames, classificationNames);
+            entityVertex.removeProperty(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY);
 
-        for (String classificationName : classificationNames) {
-            try {
-                final String entityTypeName = getTypeName(instanceVertex);
-                String relationshipLabel = 
GraphHelper.getTraitLabel(entityTypeName, classificationName);
-                AtlasEdge edge = graphHelper.getEdgeForLabel(instanceVertex, 
relationshipLabel);
-                if (edge != null) {
-                    deleteHandler.deleteEdgeReference(edge, 
TypeCategory.CLASSIFICATION, false, true, instanceVertex);
-
-                    // update the traits in entity once trait removal is 
successful
-                    traitNames.remove(classificationName);
-
-                }
-            } catch (Exception e) {
-                throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
+            for (String propagatedTraitName : propagatedTraitNames) {
+                addListProperty(entityVertex, 
PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, propagatedTraitName);
             }
         }
+    }
 
-        // remove the key
-        instanceVertex.removeProperty(Constants.TRAIT_NAMES_PROPERTY_KEY);
+    private void updateTraitNamesProperty(AtlasVertex entityVertex, 
List<String> traitNames) {
+        if (entityVertex != null) {
+            entityVertex.removeProperty(TRAIT_NAMES_PROPERTY_KEY);
 
-        // add it back again
-        for (String traitName : traitNames) {
-            GraphHelper.addProperty(instanceVertex, 
Constants.TRAIT_NAMES_PROPERTY_KEY, traitName);
+            for (String traitName : traitNames) {
+                GraphHelper.addProperty(entityVertex, 
TRAIT_NAMES_PROPERTY_KEY, traitName);
+            }
         }
-        updateModificationMetadata(instanceVertex);
     }
 
     private void validateClassificationExists(List<String> 
existingClassifications, List<String> suppliedClassifications) throws 
AtlasBaseException {
         Set<String> existingNames = new HashSet<>(existingClassifications);
         for (String classificationName : suppliedClassifications) {
             if (!existingNames.contains(classificationName)) {
-                throw new 
AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName);
+                throw new 
AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY, 
classificationName);
             }
         }
     }
@@ -1489,4 +1714,22 @@ public class EntityGraphMapper {
 
         return currentEntityId;
     }
+
+    public void validateAndNormalizeForUpdate(AtlasClassification 
classification) throws AtlasBaseException {
+        AtlasClassificationType type = 
typeRegistry.getClassificationTypeByName(classification.getTypeName());
+
+        if (type == null) {
+            throw new 
AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, 
classification.getTypeName());
+        }
+
+        List<String> messages = new ArrayList<>();
+
+        type.validateValueForUpdate(classification, 
classification.getTypeName(), messages);
+
+        if (!messages.isEmpty()) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, 
messages);
+        }
+
+        type.getNormalizedValueForUpdate(classification);
+    }
 }

Reply via email to