This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch branch-1.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-1.0 by this push: new 14f0c67 ATLAS-3055: updated entity create/update to handle relationship attributes consistently 14f0c67 is described below commit 14f0c6707db37ba6e3e575a56228037f8eaa19d0 Author: Madhan Neethiraj <mad...@apache.org> AuthorDate: Thu Feb 21 09:54:05 2019 -0800 ATLAS-3055: updated entity create/update to handle relationship attributes consistently (cherry picked from commit 18019733f28448458e2cad94f5e67aabd18316be) --- .../org/apache/atlas/type/AtlasEntityType.java | 6 +- .../apache/atlas/GraphTransactionInterceptor.java | 22 ++++++ .../org/apache/atlas/glossary/GlossaryUtils.java | 8 +- .../graph/v2/AtlasEntityGraphDiscoveryV2.java | 9 ++- .../store/graph/v2/AtlasEntityStoreV2.java | 31 ++++++-- .../store/graph/v2/AtlasGraphUtilsV2.java | 19 +++-- .../store/graph/v2/EntityGraphMapper.java | 92 ++++++++++++++++------ 7 files changed, 138 insertions(+), 49 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java index 7166caa..8960703 100644 --- a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java +++ b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java @@ -709,10 +709,10 @@ public class AtlasEntityType extends AtlasStructType { String attributeName = attribute.getName(); AtlasAttributeDef attributeDef = attribute.getAttributeDef(); - if (((AtlasEntity) obj).hasRelationshipAttribute(attributeName)) { - Object attributeValue = getNormalizedValue(entityObj.getAttribute(attributeName), attributeDef); + if (entityObj.hasRelationshipAttribute(attributeName)) { + Object attributeValue = getNormalizedValue(entityObj.getRelationshipAttribute(attributeName), attributeDef); - obj.setAttribute(attributeName, attributeValue); + entityObj.setRelationshipAttribute(attributeName, attributeValue); } } } diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java index cbd2226..d0db58a 100644 --- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java +++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java @@ -23,6 +23,7 @@ import org.aopalliance.intercept.MethodInvocation; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.NotFoundException; import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +34,7 @@ import javax.ws.rs.core.Response; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -47,6 +49,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor { private static final ThreadLocal<List<PostTransactionHook>> postTransactionHooks = new ThreadLocal<>(); private static final ThreadLocal<Boolean> isTxnOpen = ThreadLocal.withInitial(() -> Boolean.FALSE); private static final ThreadLocal<Boolean> innerFailure = ThreadLocal.withInitial(() -> Boolean.FALSE); + private static final ThreadLocal<Map<String, AtlasVertex>> guidVertexCache = ThreadLocal.withInitial(() -> new HashMap<>()); private final AtlasGraph graph; @@ -112,6 +115,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor { // Reset the boolean flags isTxnOpen.set(Boolean.FALSE); innerFailure.set(Boolean.FALSE); + guidVertexCache.get().clear(); List<PostTransactionHook> trxHooks = postTransactionHooks.get(); @@ -172,6 +176,24 @@ public class GraphTransactionInterceptor implements MethodInterceptor { OBJECT_UPDATE_SYNCHRONIZER.lockObject(guids); } + public static void addToVertexCache(String guid, AtlasVertex vertex) { + Map<String, AtlasVertex> cache = guidVertexCache.get(); + + cache.put(guid, vertex); + } + + public static void removeFromVertexCache(String guid) { + Map<String, AtlasVertex> cache = guidVertexCache.get(); + + cache.remove(guid); + } + + public static AtlasVertex getVertexFromCache(String guid) { + Map<String, AtlasVertex> cache = guidVertexCache.get(); + + return cache.get(guid); + } + boolean logException(Throwable t) { if (t instanceof AtlasBaseException) { Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode(); diff --git a/repository/src/main/java/org/apache/atlas/glossary/GlossaryUtils.java b/repository/src/main/java/org/apache/atlas/glossary/GlossaryUtils.java index cec4c23..9625f94 100644 --- a/repository/src/main/java/org/apache/atlas/glossary/GlossaryUtils.java +++ b/repository/src/main/java/org/apache/atlas/glossary/GlossaryUtils.java @@ -88,13 +88,7 @@ public abstract class GlossaryUtils { protected void createRelationship(AtlasRelationship relationship) throws AtlasBaseException { - try { - relationshipStore.create(relationship); - } catch (AtlasBaseException e) { - if (!e.getAtlasErrorCode().equals(AtlasErrorCode.RELATIONSHIP_ALREADY_EXISTS)) { - throw e; - } - } + relationshipStore.getOrCreate(relationship); } protected void updateRelationshipAttributes(AtlasRelationship relationship, AtlasRelatedTermHeader relatedTermHeader) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java index ddab2bf..23dc83a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java @@ -331,12 +331,15 @@ public class AtlasEntityGraphDiscoveryV2 implements EntityGraphDiscovery { private void visitRelationships(AtlasEntityType entityType, AtlasEntity entity, List<String> visitedAttributes) throws AtlasBaseException { for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) { - AtlasType attrType = attribute.getAttributeType(); String attrName = attribute.getName(); - Object attrVal = entity.getRelationshipAttribute(attrName); + // if attribute is not in 'relationshipAttributes', try 'attributes' if (entity.hasRelationshipAttribute(attrName)) { - visitAttribute(attrType, attrVal); + visitAttribute(attribute.getAttributeType(), entity.getRelationshipAttribute(attrName)); + + visitedAttributes.add(attrName); + } else if (entity.hasAttribute(attrName)) { + visitAttribute(attribute.getAttributeType(), entity.getAttribute(attrName)); visitedAttributes.add(attrName); } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index 8af264b..a622fb5 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -727,15 +727,11 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); boolean hasUpdates = false; - if (MapUtils.isNotEmpty(entity.getRelationshipAttributes())) { - hasUpdates = true; // if relationship attributes are provided, assume there is an update - } - if (!hasUpdates) { hasUpdates = entity.getStatus() == AtlasEntity.Status.DELETED; // entity status could be updated during import } - if (!hasUpdates) { + if (!hasUpdates && MapUtils.isNotEmpty(entity.getAttributes())) { // check for attribute value change for (AtlasAttribute attribute : entityType.getAllAttributes().values()) { if (!entity.getAttributes().containsKey(attribute.getName())) { // if value is not provided, current value will not be updated continue; @@ -756,6 +752,27 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { } } + if (!hasUpdates && MapUtils.isNotEmpty(entity.getRelationshipAttributes())) { // check of relationsship-attribute value change + for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) { + if (!entity.getRelationshipAttributes().containsKey(attribute.getName())) { // if value is not provided, current value will not be updated + continue; + } + + Object newVal = entity.getRelationshipAttribute(attribute.getName()); + Object currVal = entityRetriever.getEntityAttribute(vertex, attribute); + + if (!attribute.getAttributeType().areEqualValues(currVal, newVal, context.getGuidAssignments())) { + hasUpdates = true; + + if (LOG.isDebugEnabled()) { + LOG.debug("found relationship attribute update: entity(guid={}, typeName={}), attrName={}, currValue={}, newValue={}", guid, entity.getTypeName(), attribute.getName(), currVal, newVal); + } + + break; + } + } + } + // if classifications are to be replaced, then skip updates only when no change in classifications if (!hasUpdates && replaceClassifications) { List<AtlasClassification> newVal = entity.getClassifications(); @@ -775,7 +792,9 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { entitiesToSkipUpdate = new ArrayList<>(); } - LOG.info("skipping unchanged entity: {}", entity); + if (LOG.isDebugEnabled()) { + LOG.debug("skipping unchanged entity: {}", entity); + } entitiesToSkipUpdate.add(entity); } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java index 2548537..798b362 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java @@ -20,6 +20,7 @@ package org.apache.atlas.repository.store.graph.v2; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.GraphTransactionInterceptor; import org.apache.atlas.RequestContext; import org.apache.atlas.SortOrder; import org.apache.atlas.discovery.SearchProcessor; @@ -324,14 +325,22 @@ public class AtlasGraphUtilsV2 { } public static AtlasVertex findByGuid(String guid) { - AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query() - .has(Constants.GUID_PROPERTY_KEY, guid); + AtlasVertex ret = GraphTransactionInterceptor.getVertexFromCache(guid); - Iterator<AtlasVertex> results = query.vertices().iterator(); + if (ret == null) { + AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query() + .has(Constants.GUID_PROPERTY_KEY, guid); - AtlasVertex vertex = results.hasNext() ? results.next() : null; + Iterator<AtlasVertex> results = query.vertices().iterator(); - return vertex; + ret = results.hasNext() ? results.next() : null; + + if (ret != null) { + GraphTransactionInterceptor.addToVertexCache(guid, ret); + } + } + + return ret; } public static String getTypeNameFromGuid(String guid) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 173fe02..baaca0b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -20,6 +20,7 @@ package org.apache.atlas.repository.store.graph.v2; import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.GraphTransactionInterceptor; import org.apache.atlas.RequestContext; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TimeBoundary; @@ -135,6 +136,8 @@ public class EntityGraphMapper { AtlasGraphUtilsV2.setEncodedProperty(ret, GUID_PROPERTY_KEY, guid); AtlasGraphUtilsV2.setEncodedProperty(ret, VERSION_PROPERTY_KEY, getEntityVersion(entity)); + GraphTransactionInterceptor.addToVertexCache(guid, ret); + return ret; } @@ -186,11 +189,11 @@ public class EntityGraphMapper { AtlasVertex vertex = context.getVertex(guid); AtlasEntityType entityType = context.getType(guid); - compactAttributes(createdEntity); + compactAttributes(createdEntity, entityType); - mapRelationshipAttributes(createdEntity, vertex, CREATE, context); + mapRelationshipAttributes(createdEntity, entityType, vertex, CREATE, context); - mapAttributes(createdEntity, vertex, CREATE, context); + mapAttributes(createdEntity, entityType, vertex, CREATE, context); resp.addEntity(CREATE, constructHeader(createdEntity, entityType, vertex)); addClassifications(context, guid, createdEntity.getClassifications()); @@ -203,11 +206,11 @@ public class EntityGraphMapper { AtlasVertex vertex = context.getVertex(guid); AtlasEntityType entityType = context.getType(guid); - compactAttributes(updatedEntity); + compactAttributes(updatedEntity, entityType); - mapRelationshipAttributes(updatedEntity, vertex, UPDATE, context); + mapRelationshipAttributes(updatedEntity, entityType, vertex, UPDATE, context); - mapAttributes(updatedEntity, vertex, UPDATE, context); + mapAttributes(updatedEntity, entityType, vertex, UPDATE, context); if (isPartialUpdate) { resp.addEntity(PARTIAL_UPDATE, constructHeader(updatedEntity, entityType, vertex)); @@ -283,8 +286,11 @@ public class EntityGraphMapper { return ret; } - private void mapAttributes(AtlasStruct struct, AtlasVertex vertex, EntityOperation op, EntityMutationContext context) throws AtlasBaseException { + mapAttributes(struct, getStructType(struct.getTypeName()), vertex, op, context); + } + + private void mapAttributes(AtlasStruct struct, AtlasStructType structType, AtlasVertex vertex, EntityOperation op, EntityMutationContext context) throws AtlasBaseException { if (LOG.isDebugEnabled()) { LOG.debug("==> mapAttributes({}, {})", op, struct.getTypeName()); } @@ -292,8 +298,6 @@ public class EntityGraphMapper { if (MapUtils.isNotEmpty(struct.getAttributes())) { MetricRecorder metric = RequestContext.get().startMetricRecord("mapAttributes"); - AtlasStructType structType = getStructType(struct.getTypeName()); - if (op.equals(CREATE)) { for (AtlasAttribute attribute : structType.getAllAttributes().values()) { Object attrValue = struct.getAttribute(attribute.getName()); @@ -325,7 +329,7 @@ public class EntityGraphMapper { } } - private void mapRelationshipAttributes(AtlasEntity entity, AtlasVertex vertex, EntityOperation op, + private void mapRelationshipAttributes(AtlasEntity entity, AtlasEntityType entityType, AtlasVertex vertex, EntityOperation op, EntityMutationContext context) throws AtlasBaseException { if (LOG.isDebugEnabled()) { LOG.debug("==> mapRelationshipAttributes({}, {})", op, entity.getTypeName()); @@ -334,8 +338,6 @@ public class EntityGraphMapper { if (MapUtils.isNotEmpty(entity.getRelationshipAttributes())) { MetricRecorder metric = RequestContext.get().startMetricRecord("mapRelationshipAttributes"); - AtlasEntityType entityType = getEntityType(entity.getTypeName()); - if (op.equals(CREATE)) { for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) { Object attrValue = entity.getRelationshipAttribute(attribute.getName()); @@ -439,7 +441,7 @@ public class EntityGraphMapper { AtlasEdge newEdge = null; if (ctx.getValue() != null) { - AtlasEntityType instanceType = getInstanceType(ctx.getValue()); + AtlasEntityType instanceType = getInstanceType(ctx.getValue(), context); AtlasEdge edge = currentEdge != null ? currentEdge : null; ctx.setElementType(instanceType); @@ -1090,7 +1092,7 @@ public class EntityGraphMapper { return mapStructValue(ctx, context); case OBJECT_ID_TYPE: - AtlasEntityType instanceType = getInstanceType(ctx.getValue()); + AtlasEntityType instanceType = getInstanceType(ctx.getValue(), context); ctx.setElementType(instanceType); if (ctx.getAttributeDef().isSoftReferenced()) { return mapSoftRefValue(ctx, context); @@ -1163,23 +1165,50 @@ public class EntityGraphMapper { return null; } - private AtlasEntityType getInstanceType(Object val) throws AtlasBaseException { + private AtlasEntityType getInstanceType(Object val, EntityMutationContext context) throws AtlasBaseException { AtlasEntityType ret = null; if (val != null) { String typeName = null; + String guid = null; if (val instanceof AtlasObjectId) { - typeName = ((AtlasObjectId)val).getTypeName(); + AtlasObjectId objId = (AtlasObjectId) val; + + typeName = objId.getTypeName(); + guid = objId.getGuid(); } else if (val instanceof Map) { - Object typeNameVal = ((Map)val).get(AtlasObjectId.KEY_TYPENAME); + Map map = (Map) val; + + Object typeNameVal = map.get(AtlasObjectId.KEY_TYPENAME); + Object guidVal = map.get(AtlasObjectId.KEY_GUID); if (typeNameVal != null) { typeName = typeNameVal.toString(); } + + if (guidVal != null) { + guid = guidVal.toString(); + } } - ret = typeName != null ? typeRegistry.getEntityTypeByName(typeName) : null; + if (typeName == null) { + if (guid != null) { + ret = context.getType(guid); + + if (ret == null) { + AtlasVertex vertex = context.getDiscoveryContext().getResolvedEntityVertex(guid); + + if (vertex != null) { + typeName = AtlasGraphUtilsV2.getTypeName(vertex); + } + } + } + } + + if (ret == null && typeName != null) { + ret = typeRegistry.getEntityTypeByName(typeName); + } if (ret == null) { throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString()); @@ -1868,15 +1897,28 @@ public class EntityGraphMapper { } } - private static void compactAttributes(AtlasEntity entity) { + // move/remove relationship-attributes present in 'attributes' + private static void compactAttributes(AtlasEntity entity, AtlasEntityType entityType) { if (entity != null) { - Map<String, Object> relationshipAttributes = entity.getRelationshipAttributes(); - Map<String, Object> attributes = entity.getAttributes(); + for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) { + String attrName = attribute.getName(); + + if (entity.hasAttribute(attrName)) { + Object attrValue = entity.getAttribute(attrName); - if (MapUtils.isNotEmpty(relationshipAttributes) && MapUtils.isNotEmpty(attributes)) { - for (String attrName : relationshipAttributes.keySet()) { - if (attributes.containsKey(attrName)) { - entity.removeAttribute(attrName); + if (LOG.isDebugEnabled()) { + LOG.debug("relationship attribute {}.{} is present in entity, removing it", entityType.getTypeName(), attrName); + } + + entity.removeAttribute(attrName); + + if (attrValue != null) { // relationship attribute is present in 'attributes' + // if the attribute doesn't exist in relationshipAttributes, add it + Object relationshipAttrValue = entity.getRelationshipAttribute(attrName); + + if (relationshipAttrValue == null) { + entity.setRelationshipAttribute(attrName, attrValue); + } } } }