http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
new file mode 100644
index 0000000..707ea34
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -0,0 +1,1775 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.TimeBoundary;
+import org.apache.atlas.model.TypeCategory;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.AtlasStruct;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import 
org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.RepositoryException;
+import org.apache.atlas.repository.converters.AtlasInstanceConverter;
+import org.apache.atlas.repository.graph.GraphHelper;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
+import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1;
+import org.apache.atlas.type.AtlasArrayType;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasMapType;
+import org.apache.atlas.type.AtlasStructType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import 
org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.utils.AtlasJson;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import static org.apache.atlas.model.TypeCategory.CLASSIFICATION;
+import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
+import static 
org.apache.atlas.model.instance.AtlasRelatedObjectId.KEY_RELATIONSHIP_ATTRIBUTES;
+import static 
org.apache.atlas.model.instance.EntityMutations.EntityOperation.CREATE;
+import static 
org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
+import static 
org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
+import static 
org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
+import static 
org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET;
+import static org.apache.atlas.repository.Constants.ATTRIBUTE_KEY_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
+import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY;
+import static 
org.apache.atlas.repository.Constants.ATTRIBUTE_INDEX_PROPERTY_KEY;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getCollectionElementsUsingRelationship;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getClassificationVertex;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getMapElementsProperty;
+import static org.apache.atlas.repository.graph.GraphHelper.getStatus;
+import static org.apache.atlas.repository.graph.GraphHelper.getTraitLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames;
+import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
+import static org.apache.atlas.repository.graph.GraphHelper.getTypeNames;
+import static 
org.apache.atlas.repository.graph.GraphHelper.isPropagationEnabled;
+import static org.apache.atlas.repository.graph.GraphHelper.isRelationshipEdge;
+import static org.apache.atlas.repository.graph.GraphHelper.string;
+import static 
org.apache.atlas.repository.graph.GraphHelper.updateModificationMetadata;
+import static 
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
+import static 
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isReference;
+import static 
org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
+import static 
org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
+
+@Component
+public class EntityGraphMapper {
+    private static final Logger LOG = 
LoggerFactory.getLogger(EntityGraphMapper.class);
+
+    private final GraphHelper               graphHelper = 
GraphHelper.getInstance();
+    private final AtlasGraph                graph;
+    private final DeleteHandlerV1 deleteHandler;
+    private final AtlasTypeRegistry         typeRegistry;
+    private final AtlasRelationshipStore    relationshipStore;
+    private final AtlasEntityChangeNotifier entityChangeNotifier;
+    private final AtlasInstanceConverter    instanceConverter;
+    private final EntityGraphRetriever      entityRetriever;
+
+    @Inject
+    public EntityGraphMapper(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry 
typeRegistry, AtlasGraph atlasGraph,
+                             AtlasRelationshipStore relationshipStore, 
AtlasEntityChangeNotifier entityChangeNotifier,
+                             AtlasInstanceConverter instanceConverter) {
+        this.deleteHandler        = deleteHandler;
+        this.typeRegistry         = typeRegistry;
+        this.graph                = atlasGraph;
+        this.relationshipStore    = relationshipStore;
+        this.entityChangeNotifier = entityChangeNotifier;
+        this.instanceConverter    = instanceConverter;
+        this.entityRetriever      = new EntityGraphRetriever(typeRegistry);
+    }
+
+    public AtlasVertex createVertex(AtlasEntity entity) {
+        final String guid = UUID.randomUUID().toString();
+        return createVertexWithGuid(entity, guid);
+    }
+
+    public AtlasVertex createVertexWithGuid(AtlasEntity entity, String guid) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> createVertex({})", entity.getTypeName());
+        }
+
+        AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+        AtlasVertex ret = createStructVertex(entity);
+
+        for (String superTypeName : entityType.getAllSuperTypes()) {
+            AtlasGraphUtilsV2.addProperty(ret, 
Constants.SUPER_TYPES_PROPERTY_KEY, superTypeName);
+        }
+
+        AtlasGraphUtilsV2.setProperty(ret, Constants.GUID_PROPERTY_KEY, guid);
+        AtlasGraphUtilsV2.setProperty(ret, Constants.VERSION_PROPERTY_KEY, 
getEntityVersion(entity));
+
+        return ret;
+    }
+
+    public void updateSystemAttributes(AtlasVertex vertex, AtlasEntity entity) 
{
+        if (entity.getStatus() != null) {
+            AtlasGraphUtilsV2.setProperty(vertex, 
Constants.STATE_PROPERTY_KEY, entity.getStatus().name());
+        }
+
+        if (entity.getCreateTime() != null) {
+            AtlasGraphUtilsV2.setProperty(vertex, 
Constants.TIMESTAMP_PROPERTY_KEY, entity.getCreateTime().getTime());
+        }
+
+        if (entity.getUpdateTime() != null) {
+            AtlasGraphUtilsV2.setProperty(vertex, 
Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, 
entity.getUpdateTime().getTime());
+        }
+
+        if (StringUtils.isNotEmpty(entity.getCreatedBy())) {
+            AtlasGraphUtilsV2.setProperty(vertex, Constants.CREATED_BY_KEY, 
entity.getCreatedBy());
+        }
+
+        if (StringUtils.isNotEmpty(entity.getUpdatedBy())) {
+            AtlasGraphUtilsV2.setProperty(vertex, Constants.MODIFIED_BY_KEY, 
entity.getUpdatedBy());
+        }
+    }
+
+    public EntityMutationResponse 
mapAttributesAndClassifications(EntityMutationContext context, final boolean 
isPartialUpdate, final boolean replaceClassifications) throws 
AtlasBaseException {
+        EntityMutationResponse resp = new EntityMutationResponse();
+
+        Collection<AtlasEntity> createdEntities = context.getCreatedEntities();
+        Collection<AtlasEntity> updatedEntities = context.getUpdatedEntities();
+
+        if (CollectionUtils.isNotEmpty(createdEntities)) {
+            for (AtlasEntity createdEntity : createdEntities) {
+                String          guid       = createdEntity.getGuid();
+                AtlasVertex     vertex     = context.getVertex(guid);
+                AtlasEntityType entityType = context.getType(guid);
+
+                compactAttributes(createdEntity);
+
+                mapRelationshipAttributes(createdEntity, vertex, CREATE, 
context);
+
+                mapAttributes(createdEntity, vertex, CREATE, context);
+
+                resp.addEntity(CREATE, constructHeader(createdEntity, 
entityType, vertex));
+                addClassifications(context, guid, 
createdEntity.getClassifications());
+            }
+        }
+
+        if (CollectionUtils.isNotEmpty(updatedEntities)) {
+            for (AtlasEntity updatedEntity : updatedEntities) {
+                String          guid       = updatedEntity.getGuid();
+                AtlasVertex     vertex     = context.getVertex(guid);
+                AtlasEntityType entityType = context.getType(guid);
+
+                compactAttributes(updatedEntity);
+
+                mapRelationshipAttributes(updatedEntity, vertex, UPDATE, 
context);
+
+                mapAttributes(updatedEntity, vertex, UPDATE, context);
+
+                if (isPartialUpdate) {
+                    resp.addEntity(PARTIAL_UPDATE, 
constructHeader(updatedEntity, entityType, vertex));
+                } else {
+                    resp.addEntity(UPDATE, constructHeader(updatedEntity, 
entityType, vertex));
+                }
+
+                if ( replaceClassifications ) {
+                    deleteClassifications(guid);
+                    addClassifications(context, guid, 
updatedEntity.getClassifications());
+                }
+            }
+        }
+
+        RequestContext req = RequestContext.get();
+
+        for (AtlasObjectId entity : req.getDeletedEntities()) {
+            resp.addEntity(DELETE, entity);
+        }
+
+        for (AtlasObjectId entity : req.getUpdatedEntities()) {
+            if (isPartialUpdate) {
+                resp.addEntity(PARTIAL_UPDATE, entity);
+            }
+            else {
+                resp.addEntity(UPDATE, entity);
+            }
+        }
+
+        return resp;
+    }
+
+    private AtlasVertex createStructVertex(AtlasStruct struct) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> createStructVertex({})", struct.getTypeName());
+        }
+
+        final AtlasVertex ret = graph.addVertex();
+
+        AtlasGraphUtilsV2.setProperty(ret, Constants.ENTITY_TYPE_PROPERTY_KEY, 
struct.getTypeName());
+        AtlasGraphUtilsV2.setProperty(ret, Constants.STATE_PROPERTY_KEY, 
AtlasEntity.Status.ACTIVE.name());
+        AtlasGraphUtilsV2.setProperty(ret, Constants.TIMESTAMP_PROPERTY_KEY, 
RequestContext.get().getRequestTime());
+        AtlasGraphUtilsV2.setProperty(ret, 
Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, 
RequestContext.get().getRequestTime());
+        AtlasGraphUtilsV2.setProperty(ret, Constants.CREATED_BY_KEY, 
RequestContext.get().getUser());
+        GraphHelper.setProperty(ret, Constants.MODIFIED_BY_KEY, 
RequestContext.get().getUser());
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== createStructVertex({})", struct.getTypeName());
+        }
+
+        return ret;
+    }
+
+    private AtlasVertex createClassificationVertex(AtlasClassification 
classification) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> createVertex({})", classification.getTypeName());
+        }
+
+        AtlasClassificationType classificationType = 
typeRegistry.getClassificationTypeByName(classification.getTypeName());
+
+        AtlasVertex ret = createStructVertex(classification);
+
+        AtlasGraphUtilsV2.addProperty(ret, Constants.SUPER_TYPES_PROPERTY_KEY, 
classificationType.getAllSuperTypes());
+        AtlasGraphUtilsV2.setProperty(ret, 
Constants.CLASSIFICATION_ENTITY_GUID, classification.getEntityGuid());
+
+        return ret;
+    }
+
+
+    private void mapAttributes(AtlasStruct struct, AtlasVertex vertex, 
EntityOperation op, EntityMutationContext context) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> mapAttributes({}, {})", op, struct.getTypeName());
+        }
+
+        if (MapUtils.isNotEmpty(struct.getAttributes())) {
+            AtlasStructType structType = getStructType(struct.getTypeName());
+
+            if (op.equals(CREATE)) {
+                for (AtlasAttribute attribute : 
structType.getAllAttributes().values()) {
+                    Object attrValue = 
struct.getAttribute(attribute.getName());
+
+                    mapAttribute(attribute, attrValue, vertex, op, context);
+                }
+
+            } else if (op.equals(UPDATE)) {
+                for (String attrName : struct.getAttributes().keySet()) {
+                    AtlasAttribute attribute = 
structType.getAttribute(attrName);
+
+                    if (attribute != null) {
+                        Object attrValue = struct.getAttribute(attrName);
+
+                        mapAttribute(attribute, attrValue, vertex, op, 
context);
+                    } else {
+                        LOG.warn("mapAttributes(): invalid attribute {}.{}. 
Ignored..", struct.getTypeName(), attrName);
+                    }
+                }
+            }
+
+            updateModificationMetadata(vertex);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== mapAttributes({}, {})", op, struct.getTypeName());
+        }
+    }
+
+    private void mapRelationshipAttributes(AtlasEntity entity, AtlasVertex 
vertex, EntityOperation op,
+                                           EntityMutationContext context) 
throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> mapRelationshipAttributes({}, {})", op, 
entity.getTypeName());
+        }
+
+        if (MapUtils.isNotEmpty(entity.getRelationshipAttributes())) {
+            AtlasEntityType entityType = getEntityType(entity.getTypeName());
+
+            if (op.equals(CREATE)) {
+                for (AtlasAttribute attribute : 
entityType.getRelationshipAttributes().values()) {
+                    Object attrValue = 
entity.getRelationshipAttribute(attribute.getName());
+
+                    mapAttribute(attribute, attrValue, vertex, op, context);
+                }
+
+            } else if (op.equals(UPDATE)) {
+                // relationship attributes mapping
+                for (AtlasAttribute attribute : 
entityType.getRelationshipAttributes().values()) {
+                    if (attribute != null && 
entity.hasRelationshipAttribute(attribute.getName())) {
+                        Object attrValue = 
entity.getRelationshipAttribute(attribute.getName());
+
+                        mapAttribute(attribute, attrValue, vertex, op, 
context);
+                    }
+                }
+            }
+
+            updateModificationMetadata(vertex);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== mapRelationshipAttributes({}, {})", op, 
entity.getTypeName());
+        }
+    }
+
+    private void mapAttribute(AtlasAttribute attribute, Object attrValue, 
AtlasVertex vertex, EntityOperation op, EntityMutationContext context) throws 
AtlasBaseException {
+        if (attrValue == null) {
+            AtlasAttributeDef attributeDef = attribute.getAttributeDef();
+            AtlasType attrType = attribute.getAttributeType();
+            if (attrType.getTypeCategory() == TypeCategory.PRIMITIVE) {
+                if (attributeDef.getDefaultValue() != null) {
+                    attrValue = 
attrType.createDefaultValue(attributeDef.getDefaultValue());
+                } else {
+                    if (attribute.getAttributeDef().getIsOptional()) {
+                        attrValue = attrType.createOptionalDefaultValue();
+                    } else {
+                        attrValue = attrType.createDefaultValue();
+                    }
+                }
+            }
+        }
+
+        AttributeMutationContext ctx = new AttributeMutationContext(op, 
vertex, attribute, attrValue);
+
+        mapToVertexByTypeCategory(ctx, context);
+    }
+
+    private Object mapToVertexByTypeCategory(AttributeMutationContext ctx, 
EntityMutationContext context) throws AtlasBaseException {
+        if (ctx.getOp() == CREATE && ctx.getValue() == null) {
+            return null;
+        }
+
+        switch (ctx.getAttrType().getTypeCategory()) {
+            case PRIMITIVE:
+            case ENUM:
+                return mapPrimitiveValue(ctx);
+
+            case STRUCT: {
+                String    edgeLabel   = 
AtlasGraphUtilsV2.getEdgeLabel(ctx.getVertexProperty());
+                AtlasEdge currentEdge = 
graphHelper.getEdgeForLabel(ctx.getReferringVertex(), edgeLabel);
+                AtlasEdge edge        = currentEdge != null ? currentEdge : 
null;
+
+                ctx.setExistingEdge(edge);
+
+                AtlasEdge newEdge = mapStructValue(ctx, context);
+
+                if (currentEdge != null && !currentEdge.equals(newEdge)) {
+                    deleteHandler.deleteEdgeReference(currentEdge, 
ctx.getAttrType().getTypeCategory(), false, true, ctx.getReferringVertex());
+                }
+
+                return newEdge;
+            }
+
+            case OBJECT_ID_TYPE: {
+                AtlasRelationshipEdgeDirection edgeDirection = 
ctx.getAttribute().getRelationshipEdgeDirection();
+                String edgeLabel = 
ctx.getAttribute().getRelationshipEdgeLabel();
+
+                // if relationshipDefs doesn't exist, use legacy way of 
finding edge label.
+                if (StringUtils.isEmpty(edgeLabel)) {
+                    edgeLabel = 
AtlasGraphUtilsV2.getEdgeLabel(ctx.getVertexProperty());
+                }
+
+                String    relationshipGuid = 
getRelationshipGuid(ctx.getValue());
+                AtlasEdge currentEdge;
+
+                // if relationshipGuid is assigned in AtlasRelatedObjectId use 
it to fetch existing AtlasEdge
+                if (StringUtils.isNotEmpty(relationshipGuid) && 
!context.isImport()) {
+                    currentEdge = graphHelper.getEdgeForGUID(relationshipGuid);
+                } else {
+                    currentEdge = 
graphHelper.getEdgeForLabel(ctx.getReferringVertex(), edgeLabel, edgeDirection);
+                }
+
+                AtlasEdge newEdge = null;
+
+                if (ctx.getValue() != null) {
+                    AtlasEntityType instanceType = 
getInstanceType(ctx.getValue());
+                    AtlasEdge       edge         = currentEdge != null ? 
currentEdge : null;
+
+                    ctx.setElementType(instanceType);
+                    ctx.setExistingEdge(edge);
+
+                    newEdge = mapObjectIdValueUsingRelationship(ctx, context);
+
+                    // legacy case update inverse attribute
+                    if (ctx.getAttribute().getInverseRefAttribute() != null) {
+                        // Update the inverse reference using relationship on 
the target entity
+                        addInverseReference(context, 
ctx.getAttribute().getInverseRefAttribute(), newEdge, 
getRelationshipAttributes(ctx.getValue()));
+                    }
+                }
+
+                // created new relationship,
+                // record entity update on both vertices of the new 
relationship
+                if (currentEdge == null && newEdge != null) {
+
+                    // based on relationship edge direction record update only 
on attribute vertex
+                    if (edgeDirection == IN) {
+                        recordEntityUpdate(newEdge.getOutVertex());
+
+                    } else {
+                        recordEntityUpdate(newEdge.getInVertex());
+                    }
+                }
+
+                // update references, if current and new edge don't match
+                // record entity update on new reference and delete(edge) old 
reference.
+                if (currentEdge != null && !currentEdge.equals(newEdge)) {
+
+                    //record entity update on new edge
+                    if (isRelationshipEdge(newEdge)) {
+                        AtlasVertex attrVertex = 
context.getDiscoveryContext().getResolvedEntityVertex(getGuid(ctx.getValue()));
+
+                        recordEntityUpdate(attrVertex);
+                    }
+
+                    //delete old reference
+                    deleteHandler.deleteEdgeReference(currentEdge, 
ctx.getAttrType().getTypeCategory(), ctx.getAttribute().isOwnedRef(),
+                                                      true, 
ctx.getAttribute().getRelationshipEdgeDirection(), ctx.getReferringVertex());
+                }
+
+                return newEdge;
+            }
+
+            case MAP:
+                return mapMapValue(ctx, context);
+
+            case ARRAY:
+                return mapArrayValue(ctx, context);
+
+            default:
+                throw new 
AtlasBaseException(AtlasErrorCode.TYPE_CATEGORY_INVALID, 
ctx.getAttrType().getTypeCategory().name());
+        }
+    }
+
+    private void addInverseReference(EntityMutationContext context, 
AtlasAttribute inverseAttribute, AtlasEdge edge, Map<String, Object> 
relationshipAttributes) throws AtlasBaseException {
+        AtlasStructType inverseType      = inverseAttribute.getDefinedInType();
+        AtlasVertex     inverseVertex    = edge.getInVertex();
+        String          inverseEdgeLabel = 
inverseAttribute.getRelationshipEdgeLabel();
+        AtlasEdge       inverseEdge      = 
graphHelper.getEdgeForLabel(inverseVertex, inverseEdgeLabel);
+        String          propertyName     = 
AtlasGraphUtilsV2.getQualifiedAttributePropertyKey(inverseType, 
inverseAttribute.getName());
+
+        // create new inverse reference
+        AtlasEdge newEdge = createInverseReferenceUsingRelationship(context, 
inverseAttribute, edge, relationshipAttributes);
+
+        boolean inverseUpdated = true;
+        switch (inverseAttribute.getAttributeType().getTypeCategory()) {
+        case OBJECT_ID_TYPE:
+            if (inverseEdge != null) {
+                if (!inverseEdge.equals(newEdge)) {
+                    // Disconnect old reference
+                    deleteHandler.deleteEdgeReference(inverseEdge, 
inverseAttribute.getAttributeType().getTypeCategory(),
+                                                      
inverseAttribute.isOwnedRef(), true, inverseVertex);
+                }
+                else {
+                    // Edge already exists for this attribute between these 
vertices.
+                    inverseUpdated = false;
+                }
+            }
+            break;
+        case ARRAY:
+            // Add edge ID to property value
+            List<String> elements = inverseVertex.getProperty(propertyName, 
List.class);
+            if (newEdge != null && elements == null) {
+                elements = new ArrayList<>();
+                elements.add(newEdge.getId().toString());
+                inverseVertex.setProperty(propertyName, elements);
+            }
+            else {
+               if (newEdge != null && 
!elements.contains(newEdge.getId().toString())) {
+                    elements.add(newEdge.getId().toString());
+                    inverseVertex.setProperty(propertyName, elements);
+               }
+               else {
+                   // Property value list already contains the edge ID.
+                   inverseUpdated = false;
+               }
+            }
+            break;
+        default:
+            break;
+        }
+
+        if (inverseUpdated) {
+            RequestContext requestContext = RequestContext.get();
+
+            if 
(!requestContext.isDeletedEntity(GraphHelper.getGuid(inverseVertex))) {
+                updateModificationMetadata(inverseVertex);
+
+                
requestContext.recordEntityUpdate(entityRetriever.toAtlasObjectId(inverseVertex));
+            }
+        }
+    }
+
+    private AtlasEdge 
createInverseReferenceUsingRelationship(EntityMutationContext context, 
AtlasAttribute inverseAttribute, AtlasEdge edge, Map<String, Object> 
relationshipAttributes) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> createInverseReferenceUsingRelationship()");
+        }
+
+        String      inverseAttributeName   = inverseAttribute.getName();
+        AtlasType   inverseAttributeType   = 
inverseAttribute.getDefinedInType();
+        AtlasVertex inverseVertex          = edge.getInVertex();
+        AtlasVertex vertex                 = edge.getOutVertex();
+        AtlasEdge   ret;
+
+        if (inverseAttributeType instanceof AtlasEntityType) {
+            AtlasEntityType entityType = (AtlasEntityType) 
inverseAttributeType;
+
+            if (entityType.hasRelationshipAttribute(inverseAttributeName)) {
+                String relationshipName = 
graphHelper.getRelationshipDefName(inverseVertex, entityType, 
inverseAttributeName);
+
+                ret = getOrCreateRelationship(inverseVertex, vertex, 
relationshipName, relationshipAttributes);
+
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("No RelationshipDef defined between {} and {} on 
attribute: {}", inverseAttributeType,
+                              AtlasGraphUtilsV2.getTypeName(vertex), 
inverseAttributeName);
+                }
+                // if no RelationshipDef found, use legacy way to create edges
+                ret = createInverseReference(inverseAttribute, 
(AtlasStructType) inverseAttributeType, inverseVertex, vertex);
+            }
+        } else {
+            // inverseAttribute not of type AtlasEntityType, use legacy way to 
create edges
+            ret = createInverseReference(inverseAttribute, (AtlasStructType) 
inverseAttributeType, inverseVertex, vertex);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== createInverseReferenceUsingRelationship()");
+        }
+
+        updateRelationshipGuidForImport(context, inverseAttributeName, 
inverseVertex, ret);
+
+        return ret;
+    }
+
+    private void updateRelationshipGuidForImport(EntityMutationContext 
context, String inverseAttributeName, AtlasVertex inverseVertex, AtlasEdge 
edge) throws AtlasBaseException {
+        if (!context.isImport()) {
+            return;
+        }
+
+        String parentGuid = GraphHelper.getGuid(inverseVertex);
+        if(StringUtils.isEmpty(parentGuid)) {
+            return;
+        }
+
+        AtlasEntity entity = context.getCreatedOrUpdatedEntity(parentGuid);
+        if(entity == null) {
+            return;
+        }
+
+        String parentRelationshipGuid = 
getRelationshipGuid(entity.getRelationshipAttribute(inverseAttributeName));
+        if(StringUtils.isEmpty(parentRelationshipGuid)) {
+            return;
+        }
+
+        AtlasGraphUtilsV2.setProperty(edge, 
Constants.RELATIONSHIP_GUID_PROPERTY_KEY, parentRelationshipGuid);
+    }
+
+    // legacy method to create edges for inverse reference
+    private AtlasEdge createInverseReference(AtlasAttribute inverseAttribute, 
AtlasStructType inverseAttributeType,
+                                             AtlasVertex inverseVertex, 
AtlasVertex vertex) throws AtlasBaseException {
+
+        String propertyName     = 
AtlasGraphUtilsV2.getQualifiedAttributePropertyKey(inverseAttributeType, 
inverseAttribute.getName());
+        String inverseEdgeLabel = AtlasGraphUtilsV2.getEdgeLabel(propertyName);
+        AtlasEdge ret;
+
+        try {
+            ret = graphHelper.getOrCreateEdge(inverseVertex, vertex, 
inverseEdgeLabel);
+
+        } catch (RepositoryException e) {
+            throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
+        }
+
+        return ret;
+    }
+
+    private Object mapPrimitiveValue(AttributeMutationContext ctx) {
+        AtlasGraphUtilsV2.setProperty(ctx.getReferringVertex(), 
ctx.getVertexProperty(), ctx.getValue());
+
+        return ctx.getValue();
+    }
+
+    private AtlasEdge mapStructValue(AttributeMutationContext ctx, 
EntityMutationContext context) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> mapStructValue({})", ctx);
+        }
+
+        AtlasEdge ret = null;
+
+        if (ctx.getCurrentEdge() != null) {
+            AtlasStruct structVal = null;
+            if (ctx.getValue() instanceof AtlasStruct) {
+                structVal = (AtlasStruct)ctx.getValue();
+            } else if (ctx.getValue() instanceof Map) {
+                structVal = new AtlasStruct(ctx.getAttrType().getTypeName(), 
(Map) AtlasTypeUtil.toStructAttributes((Map)ctx.getValue()));
+            }
+
+            if (structVal != null) {
+                updateVertex(structVal, ctx.getCurrentEdge().getInVertex(), 
context);
+            }
+
+            ret = ctx.getCurrentEdge();
+        } else if (ctx.getValue() != null) {
+            String edgeLabel = 
AtlasGraphUtilsV2.getEdgeLabel(ctx.getVertexProperty());
+
+            AtlasStruct structVal = null;
+            if (ctx.getValue() instanceof AtlasStruct) {
+                structVal = (AtlasStruct) ctx.getValue();
+            } else if (ctx.getValue() instanceof Map) {
+                structVal = new AtlasStruct(ctx.getAttrType().getTypeName(), 
(Map) AtlasTypeUtil.toStructAttributes((Map)ctx.getValue()));
+            }
+
+            if (structVal != null) {
+                ret = createVertex(structVal, ctx.getReferringVertex(), 
edgeLabel, context);
+            }
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== mapStructValue({})", ctx);
+        }
+
+        return ret;
+    }
+
+    private AtlasEdge mapObjectIdValue(AttributeMutationContext ctx, 
EntityMutationContext context) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> mapObjectIdValue({})", ctx);
+        }
+
+        AtlasEdge ret = null;
+
+        String guid = getGuid(ctx.getValue());
+
+        AtlasVertex entityVertex = 
context.getDiscoveryContext().getResolvedEntityVertex(guid);
+
+        if (entityVertex == null) {
+            AtlasObjectId objId = getObjectId(ctx.getValue());
+
+            if (objId != null) {
+                entityVertex = 
context.getDiscoveryContext().getResolvedEntityVertex(objId);
+            }
+        }
+
+        if (entityVertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, 
(ctx.getValue() == null ? null : ctx.getValue().toString()));
+        }
+
+        if (ctx.getCurrentEdge() != null) {
+            ret = updateEdge(ctx.getAttributeDef(), ctx.getValue(), 
ctx.getCurrentEdge(), entityVertex);
+        } else if (ctx.getValue() != null) {
+            String edgeLabel = 
AtlasGraphUtilsV2.getEdgeLabel(ctx.getVertexProperty());
+
+            try {
+                ret = graphHelper.getOrCreateEdge(ctx.getReferringVertex(), 
entityVertex, edgeLabel);
+            } catch (RepositoryException e) {
+                throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
+            }
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== mapObjectIdValue({})", ctx);
+        }
+
+        return ret;
+    }
+
+    private AtlasEdge 
mapObjectIdValueUsingRelationship(AttributeMutationContext ctx, 
EntityMutationContext context) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> mapObjectIdValueUsingRelationship({})", ctx);
+        }
+
+        AtlasVertex attributeVertex = 
context.getDiscoveryContext().getResolvedEntityVertex(getGuid(ctx.getValue()));
+        AtlasVertex entityVertex    = ctx.getReferringVertex();
+        AtlasEdge   ret;
+
+        if (attributeVertex == null) {
+            AtlasObjectId objectId = getObjectId(ctx.getValue());
+
+            attributeVertex = (objectId != null) ? 
context.getDiscoveryContext().getResolvedEntityVertex(objectId) : null;
+        }
+
+        if (attributeVertex == null) {
+            if(context.isImport()) {
+                return null;
+            }
+
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, 
(ctx.getValue() == null ? null : ctx.getValue().toString()));
+        }
+
+        String    attributeName = ctx.getAttribute().getName();
+        AtlasType type          = 
typeRegistry.getType(AtlasGraphUtilsV2.getTypeName(entityVertex));
+
+        AtlasRelationshipEdgeDirection edgeDirection = 
ctx.getAttribute().getRelationshipEdgeDirection();
+        String                         edgeLabel     = 
ctx.getAttribute().getRelationshipEdgeLabel();
+
+        if (type instanceof AtlasEntityType) {
+            AtlasEntityType entityType = (AtlasEntityType) type;
+
+            // use relationship to create/update edges
+            if (entityType.hasRelationshipAttribute(attributeName)) {
+                Map<String, Object> relationshipAttributes = 
getRelationshipAttributes(ctx.getValue());
+
+                if (ctx.getCurrentEdge() != null) {
+                    ret = updateRelationship(ctx.getCurrentEdge(), 
entityVertex, attributeVertex, edgeDirection, relationshipAttributes);
+
+                } else {
+                    String      relationshipName = 
graphHelper.getRelationshipDefName(entityVertex, entityType, attributeName);
+                    AtlasVertex fromVertex;
+                    AtlasVertex toVertex;
+
+                    if (edgeDirection == IN) {
+                        fromVertex = attributeVertex;
+                        toVertex   = entityVertex;
+
+                    } else {
+                        fromVertex = entityVertex;
+                        toVertex   = attributeVertex;
+                    }
+                    boolean relationshipExists = 
isRelationshipExists(fromVertex, toVertex, edgeLabel);
+
+                    ret = getOrCreateRelationship(fromVertex, toVertex, 
relationshipName, relationshipAttributes);
+
+                    // for import use the relationship guid provided
+                    if (context.isImport()) {
+                        String relationshipGuid = 
getRelationshipGuid(ctx.getValue());
+                        if(!StringUtils.isEmpty(relationshipGuid)) {
+                            AtlasGraphUtilsV2.setProperty(ret, 
Constants.RELATIONSHIP_GUID_PROPERTY_KEY, relationshipGuid);
+                        }
+                    }
+
+                    // if relationship did not exist before and new 
relationship was created
+                    // record entity update on both relationship vertices
+                    if (!relationshipExists) {
+                        recordEntityUpdate(attributeVertex);
+                    }
+                }
+            } else {
+                // use legacy way to create/update edges
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("No RelationshipDef defined between {} and {} on 
attribute: {}",  getTypeName(entityVertex),
+                               getTypeName(attributeVertex), attributeName);
+                }
+
+                ret = mapObjectIdValue(ctx, context);
+            }
+
+        } else {
+            // if type is StructType having objectid as attribute
+            ret = mapObjectIdValue(ctx, context);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== mapObjectIdValueUsingRelationship({})", ctx);
+        }
+
+        return ret;
+    }
+
+    private Map<String, Object> mapMapValue(AttributeMutationContext ctx, 
EntityMutationContext context) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> mapMapValue({})", ctx);
+        }
+
+        Map<Object, Object> newVal      = (Map<Object, Object>) ctx.getValue();
+        Map<String, Object> newMap      = new HashMap<>();
+        AtlasMapType        mapType     = (AtlasMapType) ctx.getAttrType();
+        AtlasAttribute      attribute   = ctx.getAttribute();
+        Map<String, Object> currentMap  = getMapElementsProperty(mapType, 
ctx.getReferringVertex(), ctx.getVertexProperty(), attribute);
+        boolean             isReference = isReference(mapType.getValueType());
+
+        if (MapUtils.isNotEmpty(newVal)) {
+            String propertyName = ctx.getVertexProperty();
+
+            if (isReference) {
+                for (Map.Entry<Object, Object> entry : newVal.entrySet()) {
+                    String    key          = entry.getKey().toString();
+                    AtlasEdge existingEdge = getEdgeIfExists(mapType, 
currentMap, key);
+
+                    AttributeMutationContext mapCtx =  new 
AttributeMutationContext(ctx.getOp(), ctx.getReferringVertex(), attribute, 
entry.getValue(),
+                                                                               
      propertyName, mapType.getValueType(), existingEdge);
+                    // Add/Update/Remove property value
+                    Object newEntry = mapCollectionElementsToVertex(mapCtx, 
context);
+
+                    if (newEntry instanceof AtlasEdge) {
+                        AtlasEdge edge = (AtlasEdge) newEntry;
+
+                        edge.setProperty(ATTRIBUTE_KEY_PROPERTY_KEY, key);
+
+                        // If value type indicates this attribute is a 
reference, and the attribute has an inverse reference attribute,
+                        // update the inverse reference value.
+                        AtlasAttribute inverseRefAttribute = 
attribute.getInverseRefAttribute();
+
+                        if (inverseRefAttribute != null) {
+                            addInverseReference(context, inverseRefAttribute, 
edge, getRelationshipAttributes(ctx.getValue()));
+                        }
+
+                        updateInConsistentOwnedMapVertices(ctx, mapType, 
newEntry);
+
+                        newMap.put(key, newEntry);
+                    }
+                }
+
+                Map<String, Object> finalMap = 
removeUnusedMapEntries(attribute, ctx.getReferringVertex(), currentMap, newMap);
+                newMap.putAll(finalMap);
+            } else {
+                // primitive type map
+                ctx.getReferringVertex().setProperty(propertyName, new 
HashMap<>(newVal));
+
+                newVal.forEach((key, value) -> newMap.put(key.toString(), 
value));
+            }
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== mapMapValue({})", ctx);
+        }
+
+        return newMap;
+    }
+
+    public List mapArrayValue(AttributeMutationContext ctx, 
EntityMutationContext context) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> mapArrayValue({})", ctx);
+        }
+
+        AtlasAttribute attribute           = ctx.getAttribute();
+        List           newElements         = (List) ctx.getValue();
+        AtlasArrayType arrType             = (AtlasArrayType) 
attribute.getAttributeType();
+        AtlasType      elementType         = arrType.getElementType();
+        boolean        isReference         = isReference(elementType);
+        AtlasAttribute inverseRefAttribute = 
attribute.getInverseRefAttribute();
+        Cardinality    cardinality         = 
attribute.getAttributeDef().getCardinality();
+        List<Object>   newElementsCreated  = new ArrayList<>();
+        List<Object>   currentElements;
+
+        if (isReference) {
+            currentElements = (List) 
getCollectionElementsUsingRelationship(ctx.getReferringVertex(), attribute);
+        } else {
+            currentElements = (List) getArrayElementsProperty(elementType, 
ctx.getReferringVertex(), ctx.getVertexProperty());
+        }
+
+        if (CollectionUtils.isNotEmpty(newElements)) {
+            if (cardinality == SET) {
+                newElements = (List) 
newElements.stream().distinct().collect(Collectors.toList());
+            }
+
+            for (int index = 0; index < newElements.size(); index++) {
+                AtlasEdge               existingEdge = 
getEdgeAt(currentElements, index, elementType);
+                AttributeMutationContext arrCtx      = new 
AttributeMutationContext(ctx.getOp(), ctx.getReferringVertex(), 
ctx.getAttribute(), newElements.get(index),
+                                                                               
      ctx.getVertexProperty(), elementType, existingEdge);
+
+                Object newEntry = mapCollectionElementsToVertex(arrCtx, 
context);
+
+                if (isReference && newEntry != null && newEntry instanceof 
AtlasEdge && inverseRefAttribute != null) {
+                    // Update the inverse reference value.
+                    AtlasEdge newEdge = (AtlasEdge) newEntry;
+
+                    addInverseReference(context, inverseRefAttribute, newEdge, 
getRelationshipAttributes(ctx.getValue()));
+                }
+
+                if(newEntry != null) {
+                    newElementsCreated.add(newEntry);
+                }
+            }
+        }
+
+        if (isReference) {
+            List<AtlasEdge> additionalEdges = 
removeUnusedArrayEntries(attribute, (List) currentElements, (List) 
newElementsCreated, ctx.getReferringVertex());
+            newElementsCreated.addAll(additionalEdges);
+        }
+
+        // add index to attributes of array type
+       for (int index = 0; index < newElementsCreated.size(); index++) {
+           Object element = newElementsCreated.get(index);
+
+           if (element instanceof AtlasEdge) {
+               AtlasGraphUtilsV2.setProperty((AtlasEdge) element, 
ATTRIBUTE_INDEX_PROPERTY_KEY, index);
+            }
+        }
+
+        // for dereference on way out
+        setArrayElementsProperty(elementType, ctx.getReferringVertex(), 
ctx.getVertexProperty(), newElementsCreated);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== mapArrayValue({})", ctx);
+        }
+
+        return newElementsCreated;
+    }
+
+    private AtlasEdge createVertex(AtlasStruct struct, AtlasVertex 
referringVertex, String edgeLabel, EntityMutationContext context) throws 
AtlasBaseException {
+        AtlasVertex vertex = createStructVertex(struct);
+
+        mapAttributes(struct, vertex, CREATE, context);
+
+        try {
+            //TODO - Map directly in AtlasGraphUtilsV1
+            return graphHelper.getOrCreateEdge(referringVertex, vertex, 
edgeLabel);
+        } catch (RepositoryException e) {
+            throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
+        }
+    }
+
+    private void updateVertex(AtlasStruct struct, AtlasVertex vertex, 
EntityMutationContext context) throws AtlasBaseException {
+        mapAttributes(struct, vertex, UPDATE, context);
+    }
+
+    private Long getEntityVersion(AtlasEntity entity) {
+        Long ret = entity != null ? entity.getVersion() : null;
+        return (ret != null) ? ret : 0;
+    }
+
+    private AtlasStructType getStructType(String typeName) throws 
AtlasBaseException {
+        AtlasType objType = typeRegistry.getType(typeName);
+
+        if (!(objType instanceof AtlasStructType)) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, 
typeName);
+        }
+
+        return (AtlasStructType)objType;
+    }
+
+    private AtlasEntityType getEntityType(String typeName) throws 
AtlasBaseException {
+        AtlasType objType = typeRegistry.getType(typeName);
+
+        if (!(objType instanceof AtlasEntityType)) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, 
typeName);
+        }
+
+        return (AtlasEntityType)objType;
+    }
+
+    private Object mapCollectionElementsToVertex(AttributeMutationContext ctx, 
EntityMutationContext context) throws AtlasBaseException {
+        switch(ctx.getAttrType().getTypeCategory()) {
+        case PRIMITIVE:
+        case ENUM:
+        case MAP:
+        case ARRAY:
+            return ctx.getValue();
+
+        case STRUCT:
+            return mapStructValue(ctx, context);
+
+        case OBJECT_ID_TYPE:
+            AtlasEntityType instanceType = getInstanceType(ctx.getValue());
+            ctx.setElementType(instanceType);
+            return mapObjectIdValueUsingRelationship(ctx, context);
+
+        default:
+                throw new 
AtlasBaseException(AtlasErrorCode.TYPE_CATEGORY_INVALID, 
ctx.getAttrType().getTypeCategory().name());
+        }
+    }
+
+    private static AtlasObjectId getObjectId(Object val) throws 
AtlasBaseException {
+        if (val != null) {
+            if ( val instanceof  AtlasObjectId) {
+                return ((AtlasObjectId) val);
+            } else if (val instanceof Map) {
+                AtlasObjectId ret = new AtlasObjectId((Map)val);
+
+                if (AtlasTypeUtil.isValid(ret)) {
+                    return ret;
+                }
+            }
+
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, 
val.toString());
+        }
+
+        return null;
+    }
+
+    private static String getGuid(Object val) throws AtlasBaseException {
+        if (val != null) {
+            if ( val instanceof  AtlasObjectId) {
+                return ((AtlasObjectId) val).getGuid();
+            } else if (val instanceof Map) {
+                Object guidVal = ((Map)val).get(AtlasObjectId.KEY_GUID);
+
+                return guidVal != null ? guidVal.toString() : null;
+            }
+        }
+
+        return null;
+    }
+
+    private static Map<String, Object> getRelationshipAttributes(Object val) 
throws AtlasBaseException {
+        if (val instanceof AtlasRelatedObjectId) {
+            AtlasStruct relationshipStruct = ((AtlasRelatedObjectId) 
val).getRelationshipAttributes();
+
+            return (relationshipStruct != null) ? 
relationshipStruct.getAttributes() : null;
+        } else if (val instanceof Map) {
+            Object relationshipStruct = ((Map) 
val).get(KEY_RELATIONSHIP_ATTRIBUTES);
+
+            if (relationshipStruct instanceof Map) {
+                return AtlasTypeUtil.toStructAttributes(((Map) 
relationshipStruct));
+            }
+        }
+
+        return null;
+    }
+
+    private static String getRelationshipGuid(Object val) throws 
AtlasBaseException {
+        if (val instanceof AtlasRelatedObjectId) {
+            return ((AtlasRelatedObjectId) val).getRelationshipGuid();
+        } else if (val instanceof Map) {
+            Object relationshipGuidVal = ((Map) 
val).get(AtlasRelatedObjectId.KEY_RELATIONSHIP_GUID);
+
+            return relationshipGuidVal != null ? 
relationshipGuidVal.toString() : null;
+        }
+
+        return null;
+    }
+
+    private AtlasEntityType getInstanceType(Object val) throws 
AtlasBaseException {
+        AtlasEntityType ret = null;
+
+        if (val != null) {
+            String typeName = null;
+
+            if (val instanceof AtlasObjectId) {
+                typeName = ((AtlasObjectId)val).getTypeName();
+            } else if (val instanceof Map) {
+                Object typeNameVal = 
((Map)val).get(AtlasObjectId.KEY_TYPENAME);
+
+                if (typeNameVal != null) {
+                    typeName = typeNameVal.toString();
+                }
+            }
+
+            ret = typeName != null ? 
typeRegistry.getEntityTypeByName(typeName) : null;
+
+            if (ret == null) {
+                throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, 
val.toString());
+            }
+        }
+
+        return ret;
+    }
+
+    //Remove unused entries for reference map
+    private Map<String, Object> removeUnusedMapEntries(AtlasAttribute 
attribute, AtlasVertex vertex, Map<String, Object> currentMap,
+                                                       Map<String, Object> 
newMap) throws AtlasBaseException {
+        Map<String, Object> additionalMap = new HashMap<>();
+        AtlasMapType        mapType       = (AtlasMapType) 
attribute.getAttributeType();
+
+        for (String currentKey : currentMap.keySet()) {
+            //Delete the edge reference if its not part of new edges 
created/updated
+            AtlasEdge currentEdge = (AtlasEdge) currentMap.get(currentKey);
+
+            if (!newMap.values().contains(currentEdge)) {
+                boolean deleted = 
deleteHandler.deleteEdgeReference(currentEdge, 
mapType.getValueType().getTypeCategory(), attribute.isOwnedRef(), true, vertex);
+
+                if (!deleted) {
+                    additionalMap.put(currentKey, currentEdge);
+                }
+            }
+        }
+
+        return additionalMap;
+    }
+
+    private static AtlasEdge getEdgeIfExists(AtlasMapType mapType, Map<String, 
Object> currentMap, String keyStr) {
+        AtlasEdge ret = null;
+
+        if (isReference(mapType.getValueType())) {
+            Object val = currentMap.get(keyStr);
+
+            if (val != null) {
+                ret = (AtlasEdge) val;
+            }
+        }
+
+        return ret;
+    }
+
+    private AtlasEdge updateEdge(AtlasAttributeDef attributeDef, Object value, 
AtlasEdge currentEdge, final AtlasVertex entityVertex) throws 
AtlasBaseException {
+
+        LOG.debug("Updating entity reference {} for reference attribute {}",  
attributeDef.getName());
+        // Update edge if it exists
+
+        AtlasVertex currentVertex = currentEdge.getInVertex();
+        String currentEntityId = getIdFromVertex(currentVertex);
+        String newEntityId = getIdFromVertex(entityVertex);
+
+        AtlasEdge newEdge = currentEdge;
+        if (!currentEntityId.equals(newEntityId)) {
+            // add an edge to the class vertex from the instance
+            if (entityVertex != null) {
+                try {
+                    newEdge = 
graphHelper.getOrCreateEdge(currentEdge.getOutVertex(), entityVertex, 
currentEdge.getLabel());
+                } catch (RepositoryException e) {
+                    throw new 
AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
+                }
+
+            }
+        }
+        return newEdge;
+    }
+
+
+    private AtlasEdge updateRelationship(AtlasEdge currentEdge, final 
AtlasVertex parentEntityVertex, final AtlasVertex newEntityVertex,
+                                         AtlasRelationshipEdgeDirection 
edgeDirection,  Map<String, Object> relationshipAttributes)
+                                         throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Updating entity reference using relationship {} for 
reference attribute {}", getTypeName(newEntityVertex));
+        }
+
+        // Max's manager updated from Jane to Julius (Max.manager --> 
Jane.subordinates)
+        // manager attribute (OUT direction), current manager vertex (Jane) 
(IN vertex)
+
+        // Max's mentor updated from John to Jane (John.mentee --> Max.mentor)
+        // mentor attribute (IN direction), current mentee vertex (John) (OUT 
vertex)
+        String currentEntityId;
+
+        if (edgeDirection == IN) {
+            currentEntityId = getIdFromOutVertex(currentEdge);
+        } else if (edgeDirection == OUT) {
+            currentEntityId = getIdFromInVertex(currentEdge);
+        } else {
+            currentEntityId = getIdFromBothVertex(currentEdge, 
parentEntityVertex);
+        }
+
+        String    newEntityId = getIdFromVertex(newEntityVertex);
+        AtlasEdge ret         = currentEdge;
+
+        if (!currentEntityId.equals(newEntityId)) {
+            // create a new relationship edge to the new attribute vertex from 
the instance
+            String relationshipName = 
AtlasGraphUtilsV2.getTypeName(currentEdge);
+
+            if (relationshipName == null) {
+                relationshipName = currentEdge.getLabel();
+            }
+
+            if (edgeDirection == IN) {
+                ret = getOrCreateRelationship(newEntityVertex, 
currentEdge.getInVertex(), relationshipName, relationshipAttributes);
+
+            } else if (edgeDirection == OUT) {
+                ret = getOrCreateRelationship(currentEdge.getOutVertex(), 
newEntityVertex, relationshipName, relationshipAttributes);
+            } else {
+                ret = getOrCreateRelationship(newEntityVertex, 
parentEntityVertex, relationshipName, relationshipAttributes);
+            }
+
+            //record entity update on new relationship vertex
+            recordEntityUpdate(newEntityVertex);
+        }
+
+        return ret;
+    }
+
+    public static List<Object> getArrayElementsProperty(AtlasType elementType, 
AtlasVertex vertex, String vertexPropertyName) {
+        if (isReference(elementType)) {
+            return (List)vertex.getListProperty(vertexPropertyName, 
AtlasEdge.class);
+        }
+        else {
+            return (List)vertex.getListProperty(vertexPropertyName);
+        }
+    }
+
+    private AtlasEdge getEdgeAt(List<Object> currentElements, int index, 
AtlasType elemType) {
+        AtlasEdge ret = null;
+
+        if (isReference(elemType)) {
+            if (currentElements != null && index < currentElements.size()) {
+                ret = (AtlasEdge) currentElements.get(index);
+            }
+        }
+
+        return ret;
+    }
+
+    //Removes unused edges from the old collection, compared to the new 
collection
+
+    private List<AtlasEdge> removeUnusedArrayEntries(AtlasAttribute attribute, 
List<AtlasEdge> currentEntries, List<AtlasEdge> newEntries, AtlasVertex 
entityVertex) throws AtlasBaseException {
+        if (CollectionUtils.isNotEmpty(currentEntries)) {
+            AtlasType entryType = ((AtlasArrayType) 
attribute.getAttributeType()).getElementType();
+
+            if (isReference(entryType)) {
+                Collection<AtlasEdge> edgesToRemove = 
CollectionUtils.subtract(currentEntries, newEntries);
+
+                if (CollectionUtils.isNotEmpty(edgesToRemove)) {
+                    List<AtlasEdge> additionalElements = new ArrayList<>();
+
+                    for (AtlasEdge edge : edgesToRemove) {
+                        boolean deleted = 
deleteHandler.deleteEdgeReference(edge, entryType.getTypeCategory(), 
attribute.isOwnedRef(),
+                                                                             
true, attribute.getRelationshipEdgeDirection(), entityVertex);
+
+                        if (!deleted) {
+                            additionalElements.add(edge);
+                        }
+                    }
+
+                    return additionalElements;
+                }
+            }
+        }
+
+        return Collections.emptyList();
+    }
+    private void setArrayElementsProperty(AtlasType elementType, AtlasVertex 
vertex, String vertexPropertyName, List<Object> values) {
+        if (!isReference(elementType)) {
+            GraphHelper.setProperty(vertex, vertexPropertyName, values);
+        }
+    }
+
+    private AtlasEntityHeader constructHeader(AtlasEntity entity, final 
AtlasEntityType type, AtlasVertex vertex) {
+        AtlasEntityHeader header = new AtlasEntityHeader(entity.getTypeName());
+
+        header.setGuid(getIdFromVertex(vertex));
+
+        for (AtlasAttribute attribute : type.getUniqAttributes().values()) {
+            header.setAttribute(attribute.getName(), 
entity.getAttribute(attribute.getName()));
+        }
+
+        return header;
+    }
+
+    public static AtlasEntityHeader constructHeader(AtlasObjectId id) {
+        return new AtlasEntityHeader(id.getTypeName(), id.getGuid(), 
id.getUniqueAttributes());
+    }
+
+    private void updateInConsistentOwnedMapVertices(AttributeMutationContext 
ctx, AtlasMapType mapType, Object val) {
+        if (mapType.getValueType().getTypeCategory() == 
TypeCategory.OBJECT_ID_TYPE) {
+            AtlasEdge edge = (AtlasEdge) val;
+
+            if (ctx.getAttribute().isOwnedRef() && getStatus(edge) == DELETED 
&& getStatus(edge.getInVertex()) == DELETED) {
+
+                //Resurrect the vertex and edge to ACTIVE state
+                GraphHelper.setProperty(edge, STATE_PROPERTY_KEY, 
AtlasEntity.Status.ACTIVE.name());
+                GraphHelper.setProperty(edge.getInVertex(), 
STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name());
+            }
+        }
+    }
+
+    public void addClassifications(final EntityMutationContext context, String 
guid, List<AtlasClassification> classifications) throws AtlasBaseException {
+        if (CollectionUtils.isNotEmpty(classifications)) {
+            AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(guid);
+
+            if (entityVertex == null) {
+                throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
+            }
+
+            final String                                entityTypeName        
= AtlasGraphUtilsV2.getTypeName(entityVertex);
+            final AtlasEntityType                       entityType            
= typeRegistry.getEntityTypeByName(entityTypeName);
+            List<AtlasVertex>                           entitiesToPropagateTo 
= null;
+            Map<AtlasVertex, List<AtlasClassification>> propagations          
= null;
+            List<AtlasClassification>                   addClassifications    
= new ArrayList<>(classifications.size());
+
+            for (AtlasClassification c : classifications) {
+                AtlasClassification classification     = new 
AtlasClassification(c);
+                String              classificationName = 
classification.getTypeName();
+                Boolean             propagateTags      = 
classification.isPropagate();
+
+                if (propagateTags != null && propagateTags &&
+                        classification.getEntityGuid() != null &&
+                        !StringUtils.equals(classification.getEntityGuid(), 
guid)) {
+                    continue;
+                }
+
+                if (propagateTags == null) {
+                    if(context.isImport()) {
+                        propagateTags = false;
+                        classification.setPropagate(propagateTags);
+                    } else {
+                        propagateTags = true;
+                    }
+                }
+
+                // set associated entity id to classification
+                if (classification.getEntityGuid() == null) {
+                    classification.setEntityGuid(guid);
+                }
+
+                // ignore propagated classifications
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Adding classification [{}] to [{}] using edge 
label: [{}]", classificationName, entityTypeName, 
getTraitLabel(classificationName));
+                }
+
+                GraphHelper.addProperty(entityVertex, 
TRAIT_NAMES_PROPERTY_KEY, classificationName);
+
+                // add a new AtlasVertex for the struct or trait instance
+                AtlasVertex classificationVertex = 
createClassificationVertex(classification);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("created vertex {} for trait {}", 
string(classificationVertex), classificationName);
+                }
+
+                // add the attributes for the trait instance
+                mapClassification(EntityOperation.CREATE, context, 
classification, entityType, entityVertex, classificationVertex);
+
+                if (propagateTags) {
+                    // compute propagatedEntityVertices only once
+                    if (entitiesToPropagateTo == null) {
+                        entitiesToPropagateTo = 
graphHelper.getImpactedVertices(guid);
+                    }
+
+                    if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) {
+                        if (propagations == null) {
+                            propagations = new 
HashMap<>(entitiesToPropagateTo.size());
+
+                            for (AtlasVertex entityToPropagateTo : 
entitiesToPropagateTo) {
+                                propagations.put(entityToPropagateTo, new 
ArrayList<>());
+                            }
+                        }
+
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Propagating tag: [{}][{}] to {}", 
classificationName, entityTypeName, getTypeNames(entitiesToPropagateTo));
+                        }
+
+                        List<AtlasVertex> entitiesPropagatedTo = 
deleteHandler.addTagPropagation(classificationVertex, entitiesToPropagateTo);
+
+                        if (entitiesPropagatedTo != null) {
+                            for (AtlasVertex entityPropagatedTo : 
entitiesPropagatedTo) {
+                                
propagations.get(entityPropagatedTo).add(classification);
+                            }
+                        }
+                    } else {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(" --> Not propagating classification: 
[{}][{}] - no entities found to propagate to.", 
getTypeName(classificationVertex), entityTypeName);
+                        }
+                    }
+                } else {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(" --> Not propagating classification: 
[{}][{}] - propagation is disabled.", getTypeName(classificationVertex), 
entityTypeName);
+                    }
+                }
+
+                addClassifications.add(classification);
+            }
+
+            // notify listeners on classification addition
+            List<AtlasVertex> notificationVertices = new 
ArrayList<AtlasVertex>() {{ add(entityVertex); }};
+
+            if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) {
+                notificationVertices.addAll(entitiesToPropagateTo);
+            }
+
+            for (AtlasVertex vertex : notificationVertices) {
+                String                    entityGuid           = 
GraphHelper.getGuid(vertex);
+                AtlasEntityWithExtInfo    entityWithExtInfo    = 
instanceConverter.getAndCacheEntity(entityGuid);
+                AtlasEntity               entity               = 
(entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
+                List<AtlasClassification> addedClassifications = 
StringUtils.equals(entityGuid, guid) ? addClassifications : 
propagations.get(vertex);
+
+                if (CollectionUtils.isNotEmpty(addedClassifications)) {
+                    entityChangeNotifier.onClassificationAddedToEntity(entity, 
addedClassifications);
+                }
+            }
+        }
+    }
+
+    public void deleteClassifications(String entityGuid, List<String> 
classificationNames) throws AtlasBaseException {
+        if (CollectionUtils.isEmpty(classificationNames)) {
+            throw new 
AtlasBaseException(AtlasErrorCode.INVALID_CLASSIFICATION_PARAMS, "delete", 
entityGuid);
+        }
+
+        AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(entityGuid);
+
+        if (entityVertex == null) {
+            throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, entityGuid);
+        }
+
+        List<String> traitNames = getTraitNames(entityVertex);
+
+        if (CollectionUtils.isEmpty(traitNames)) {
+            throw new 
AtlasBaseException(AtlasErrorCode.NO_CLASSIFICATIONS_FOUND_FOR_ENTITY, 
entityGuid);
+        }
+
+        validateClassificationExists(traitNames, classificationNames);
+
+        Map<AtlasVertex, List<AtlasClassification>> removedClassifications = 
new HashMap<>();
+
+        for (String classificationName : classificationNames) {
+            AtlasVertex         classificationVertex = 
getClassificationVertex(entityVertex, classificationName);
+            AtlasClassification classification       = 
entityRetriever.toAtlasClassification(classificationVertex);
+
+            // remove classification from propagated entities if propagation 
is turned on
+            if (isPropagationEnabled(classificationVertex)) {
+                List<AtlasVertex> propagatedEntityVertices = 
deleteHandler.removeTagPropagation(classificationVertex);
+
+                // add propagated entities and deleted classification details 
to removeClassifications map
+                if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) {
+                    for (AtlasVertex propagatedEntityVertex : 
propagatedEntityVertices) {
+                        List<AtlasClassification> classifications = 
removedClassifications.get(propagatedEntityVertex);
+
+                        if (classifications == null) {
+                            classifications = new ArrayList<>();
+
+                            removedClassifications.put(propagatedEntityVertex, 
classifications);
+                        }
+
+                        classifications.add(classification);
+                    }
+                }
+            }
+
+            // add associated entity and deleted classification details to 
removeClassifications map
+            List<AtlasClassification> classifications = 
removedClassifications.get(entityVertex);
+
+            if (classifications == null) {
+                classifications = new ArrayList<>();
+
+                removedClassifications.put(entityVertex, classifications);
+            }
+
+            classifications.add(classification);
+
+            // remove classifications from associated entity
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Removing classification: [{}] from: [{}][{}] with 
edge label: [{}]", classificationName,
+                           getTypeName(entityVertex), entityGuid, 
CLASSIFICATION_LABEL);
+            }
+
+            AtlasEdge edge = getClassificationEdge(entityVertex, 
classificationVertex);
+
+            deleteHandler.deleteEdgeReference(edge, CLASSIFICATION, false, 
true, entityVertex);
+
+            traitNames.remove(classificationName);
+        }
+
+        updateTraitNamesProperty(entityVertex, traitNames);
+
+        updateModificationMetadata(entityVertex);
+
+        for (Map.Entry<AtlasVertex, List<AtlasClassification>> entry : 
removedClassifications.entrySet()) {
+            String                    guid                       = 
GraphHelper.getGuid(entry.getKey());
+            List<AtlasClassification> deletedClassificationNames = 
entry.getValue();
+            AtlasEntityWithExtInfo    entityWithExtInfo          = 
instanceConverter.getAndCacheEntity(guid);
+            AtlasEntity               entity                     = 
(entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
+
+            entityChangeNotifier.onClassificationDeletedFromEntity(entity, 
deletedClassificationNames);
+        }
+    }
+
+    public void updateClassifications(EntityMutationContext context, String 
guid, List<AtlasClassification> classifications) throws AtlasBaseException {
+        if (CollectionUtils.isEmpty(classifications)) {
+            throw new 
AtlasBaseException(AtlasErrorCode.INVALID_CLASSIFICATION_PARAMS, "update", 
guid);
+        }
+
+        AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(guid);
+
+        if (entityVertex == null) {
+            throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
+        }
+
+        String                    entityTypeName         = 
AtlasGraphUtilsV2.getTypeName(entityVertex);
+        AtlasEntityType           entityType             = 
typeRegistry.getEntityTypeByName(entityTypeName);
+        List<AtlasClassification> updatedClassifications = new ArrayList<>();
+        List<AtlasVertex>         entitiesToPropagateTo  = new ArrayList<>();
+
+        Map<AtlasVertex, List<AtlasClassification>> addedPropagations   = null;
+        Map<AtlasVertex, List<AtlasClassification>> removedPropagations = null;
+
+        for (AtlasClassification classification : classifications) {
+            String classificationName       = classification.getTypeName();
+            String classificationEntityGuid = classification.getEntityGuid();
+
+            if (StringUtils.isEmpty(classificationEntityGuid)) {
+                classification.setEntityGuid(guid);
+            }
+
+            if (StringUtils.isNotEmpty(classificationEntityGuid) && 
!StringUtils.equalsIgnoreCase(guid, classificationEntityGuid)) {
+                throw new 
AtlasBaseException(AtlasErrorCode.CLASSIFICATION_UPDATE_FROM_PROPAGATED_ENTITY, 
classificationName);
+            }
+
+            AtlasVertex classificationVertex = 
getClassificationVertex(entityVertex, classificationName);
+
+            if (classificationVertex == null) {
+                throw new 
AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY, 
classificationName);
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Updating classification {} for entity {}", 
classification, guid);
+            }
+
+            AtlasClassification currentClassification = 
entityRetriever.toAtlasClassification(classificationVertex);
+
+            validateAndNormalizeForUpdate(classification);
+
+            boolean isClassificationUpdated = false;
+
+            // check for attribute update
+            Map<String, Object> updatedAttributes = 
classification.getAttributes();
+
+            if (MapUtils.isNotEmpty(updatedAttributes)) {
+                for (String attributeName : updatedAttributes.keySet()) {
+                    currentClassification.setAttribute(attributeName, 
updatedAttributes.get(attributeName));
+                }
+
+                isClassificationUpdated = true;
+            }
+
+            // check for validity period update
+            List<TimeBoundary> currentValidityPeriods = 
currentClassification.getValidityPeriods();
+            List<TimeBoundary> updatedValidityPeriods = 
classification.getValidityPeriods();
+
+            if (!Objects.equals(currentValidityPeriods, 
updatedValidityPeriods)) {
+                
currentClassification.setValidityPeriods(updatedValidityPeriods);
+
+                isClassificationUpdated = true;
+            }
+
+            if (isClassificationUpdated && 
CollectionUtils.isEmpty(entitiesToPropagateTo)) {
+                entitiesToPropagateTo = 
graphHelper.getImpactedVerticesWithRestrictions(guid, 
classificationVertex.getIdForDisplay());
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("updating vertex {} for trait {}", 
string(classificationVertex), classificationName);
+            }
+
+            mapClassification(EntityOperation.UPDATE, context, classification, 
entityType, entityVertex, classificationVertex);
+
+            // handle update of 'propagate' flag
+            Boolean currentTagPropagation = 
currentClassification.isPropagate();
+            Boolean updatedTagPropagation = classification.isPropagate();
+
+            // compute propagatedEntityVertices once and use it for subsequent 
iterations and notifications
+            if (updatedTagPropagation != null && currentTagPropagation != 
updatedTagPropagation) {
+                if (updatedTagPropagation) {
+                    if (CollectionUtils.isEmpty(entitiesToPropagateTo)) {
+                        entitiesToPropagateTo = 
graphHelper.getImpactedVerticesWithRestrictions(guid, 
classificationVertex.getIdForDisplay());
+                    }
+
+                    if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) {
+                        if (addedPropagations == null) {
+                            addedPropagations = new 
HashMap<>(entitiesToPropagateTo.size());
+
+                            for (AtlasVertex entityToPropagateTo : 
entitiesToPropagateTo) {
+                                addedPropagations.put(entityToPropagateTo, new 
ArrayList<>());
+                            }
+                        }
+
+                        List<AtlasVertex> entitiesPropagatedTo = 
deleteHandler.addTagPropagation(classificationVertex, entitiesToPropagateTo);
+
+                        if (entitiesPropagatedTo != null) {
+                            for (AtlasVertex entityPropagatedTo : 
entitiesPropagatedTo) {
+                                
addedPropagations.get(entityPropagatedTo).add(classification);
+                            }
+                        }
+                    }
+                } else {
+                    List<AtlasVertex> impactedVertices = 
deleteHandler.removeTagPropagation(classificationVertex);
+
+                    if (CollectionUtils.isNotEmpty(impactedVertices)) {
+                        if (removedPropagations == null) {
+                            removedPropagations = new HashMap<>();
+
+                            for (AtlasVertex impactedVertex : 
impactedVertices) {
+                                List<AtlasClassification> 
removedClassifications = removedPropagations.get(impactedVertex);
+
+                                if (removedClassifications == null) {
+                                    removedClassifications = new ArrayList<>();
+
+                                    removedPropagations.put(impactedVertex, 
removedClassifications);
+                                }
+
+                                removedClassifications.add(classification);
+                            }
+                        }
+                    }
+                }
+            }
+
+            updatedClassifications.add(currentClassification);
+        }
+
+        // notify listeners on classification update
+        List<AtlasVertex> notificationVertices = new ArrayList<AtlasVertex>() 
{{ add(entityVertex); }};
+
+        if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) {
+            notificationVertices.addAll(entitiesToPropagateTo);
+        }
+
+        for (AtlasVertex vertex : notificationVertices) {
+            String                    entityGuid        = 
GraphHelper.getGuid(vertex);
+            AtlasEntityWithExtInfo    entityWithExtInfo = 
instanceConverter.getAndCacheEntity(entityGuid);
+            AtlasEntity               entity            = (entityWithExtInfo 
!= null) ? entityWithExtInfo.getEntity() : null;
+
+            entityChangeNotifier.onClassificationUpdatedToEntity(entity, 
updatedClassifications);
+        }
+
+        if (removedPropagations != null) {
+            for (Map.Entry<AtlasVertex, List<AtlasClassification>> entry : 
removedPropagations.entrySet()) {
+                AtlasVertex               vertex                 = 
entry.getKey();
+                List<AtlasClassification> removedClassifications = 
entry.getValue();
+                String                    entityGuid             = 
GraphHelper.getGuid(vertex);
+                AtlasEntityWithExtInfo    entityWithExtInfo      = 
instanceConverter.getAndCacheEntity(entityGuid);
+                AtlasEntity               entity                 = 
(entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null;
+
+                entityChangeNotifier.onClassificationDeletedFromEntity(entity, 
removedClassifications);
+            }
+        }
+    }
+
+    private AtlasEdge mapClassification(EntityOperation operation,  final 
EntityMutationContext context, AtlasClassification classification,
+                                        AtlasEntityType entityType, 
AtlasVertex parentInstanceVertex, AtlasVertex traitInstanceVertex)
+                                        throws AtlasBaseException {
+        if (classification.getValidityPeriods() != null) {
+            String strValidityPeriods = 
AtlasJson.toJson(classification.getValidityPeriods());
+
+            AtlasGraphUtilsV2.setProperty(traitInstanceVertex, 
Constants.CLASSIFICATION_VALIDITY_PERIODS_KEY, strValidityPeriods);
+        } else {
+            // if 'null', don't update existing value in the classification
+        }
+
+        if (classification.isPropagate() != null) {
+            AtlasGraphUtilsV2.setProperty(traitInstanceVertex, 
Constants.CLASSIFICATION_VERTEX_PROPAGATE_KEY, classification.isPropagate());
+        }
+
+        // map all the attributes to this newly created AtlasVertex
+        mapAttributes(classification, traitInstanceVertex, operation, context);
+
+        AtlasEdge ret = getClassificationEdge(parentInstanceVertex, 
traitInstanceVertex);
+
+        if (ret == null) {
+            ret = graphHelper.addClassificationEdge(parentInstanceVertex, 
traitInstanceVertex, false);
+        }
+
+        return ret;
+    }
+
+    public void deleteClassifications(String guid) throws AtlasBaseException {
+        AtlasVertex instanceVertex = AtlasGraphUtilsV2.findByGuid(guid);
+
+        if (instanceVertex == null) {
+            throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
+        }
+
+        List<String> traitNames = getTraitNames(instanceVertex);
+
+        if (CollectionUtils.isNotEmpty(traitNames)) {
+            deleteClassifications(guid, traitNames);
+        }
+    }
+
+    private void updateTraitNamesProperty(AtlasVertex entityVertex, 
List<String> traitNames) {
+        if (entityVertex != null) {
+            entityVertex.removeProperty(TRAIT_NAMES_PROPERTY_KEY);
+
+            for (String traitName : traitNames) {
+                GraphHelper.addProperty(entityVertex, 
TRAIT_NAMES_PROPERTY_KEY, traitName);
+            }
+        }
+    }
+
+    private void validateClassificationExists(List<String> 
existingClassifications, List<String> suppliedClassifications) throws 
AtlasBaseException {
+        Set<String> existingNames = new HashSet<>(existingClassifications);
+        for (String classificationName : suppliedClassifications) {
+            if (!existingNames.contains(classificationName)) {
+                throw new 
AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY, 
classificationName);
+            }
+        }
+    }
+
+    private AtlasEdge getOrCreateRelationship(AtlasVertex end1Vertex, 
AtlasVertex end2Vertex, String relationshipName,
+                                              Map<String, Object> 
relationshipAttributes) throws AtlasBaseException {
+        return relationshipStore.getOrCreate(end1Vertex, end2Vertex, new 
AtlasRelationship(relationshipName, relationshipAttributes));
+    }
+
+    private boolean isRelationshipExists(AtlasVertex fromVertex, AtlasVertex 
toVertex, String edgeLabel) {
+        boolean             ret   = false;
+        Iterator<AtlasEdge> edges = 
graphHelper.getOutGoingEdgesByLabel(fromVertex, edgeLabel);
+
+        while (edges != null && edges.hasNext()) {
+            AtlasEdge   edge     = edges.next();
+            AtlasVertex inVertex = edge.getInVertex();
+
+            if (inVertex != null && 
StringUtils.equals(getIdFromVertex(inVertex), getIdFromVertex(toVertex))) {
+                ret = true;
+            }
+        }
+
+        return ret;
+    }
+
+    private void recordEntityUpdate(AtlasVertex vertex) throws 
AtlasBaseException {
+        RequestContext req = RequestContext.get();
+
+        if (!req.isUpdatedEntity(GraphHelper.getGuid(vertex))) {
+            updateModificationMetadata(vertex);
+
+            req.recordEntityUpdate(entityRetriever.toAtlasObjectId(vertex));
+        }
+    }
+
+    private static void compactAttributes(AtlasEntity entity) {
+        if (entity != null) {
+            Map<String, Object> relationshipAttributes = 
entity.getRelationshipAttributes();
+            Map<String, Object> attributes = entity.getAttributes();
+
+            if (MapUtils.isNotEmpty(relationshipAttributes) && 
MapUtils.isNotEmpty(attributes)) {
+                for (String attrName : relationshipAttributes.keySet()) {
+                    if (attributes.containsKey(attrName)) {
+                        entity.removeAttribute(attrName);
+                    }
+                }
+            }
+        }
+    }
+
+    private String getIdFromInVertex(AtlasEdge edge) {
+        return getIdFromVertex(edge.getInVertex());
+    }
+
+    private String getIdFromOutVertex(AtlasEdge edge) {
+        return getIdFromVertex(edge.getOutVertex());
+    }
+
+    private String getIdFromBothVertex(AtlasEdge currentEdge, AtlasVertex 
parentEntityVertex) {
+        String parentEntityId  = getIdFromVertex(parentEntityVertex);
+        String currentEntityId = getIdFromVertex(currentEdge.getInVertex());
+
+        if (StringUtils.equals(currentEntityId, parentEntityId)) {
+            currentEntityId = getIdFromOutVertex(currentEdge);
+        }
+
+
+        return currentEntityId;
+    }
+
+    public void validateAndNormalizeForUpdate(AtlasClassification 
classification) throws AtlasBaseException {
+        AtlasClassificationType type = 
typeRegistry.getClassificationTypeByName(classification.getTypeName());
+
+        if (type == null) {
+            throw new 
AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, 
classification.getTypeName());
+        }
+
+        List<String> messages = new ArrayList<>();
+
+        type.validateValueForUpdate(classification, 
classification.getTypeName(), messages);
+
+        if (!messages.isEmpty()) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, 
messages);
+        }
+
+        type.getNormalizedValueForUpdate(classification);
+    }
+}

Reply via email to