http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index d61bff2..9d56fa9 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -26,10 +26,12 @@ import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContextV1; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity.Status; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasRelationship; import org.apache.atlas.model.typedef.AtlasRelationshipDef; +import org.apache.atlas.util.AtlasGremlinQueryProvider; import org.apache.atlas.v1.model.instance.Id; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.type.AtlasStructType.AtlasAttribute; @@ -55,6 +57,9 @@ import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.script.Bindings; +import javax.script.ScriptEngine; +import javax.script.ScriptException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -68,6 +73,14 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; +import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE; +import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY; +import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH; +import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN; +import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT; +import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES; +import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL; /** * Utility class for graph operations. @@ -80,6 +93,8 @@ public final class GraphHelper { public static final String RETRY_COUNT = "atlas.graph.storage.num.retries"; public static final String RETRY_DELAY = "atlas.graph.storage.retry.sleeptime.ms"; + private final AtlasGremlinQueryProvider queryProvider = AtlasGremlinQueryProvider.INSTANCE; + private static volatile GraphHelper INSTANCE; private AtlasGraph graph; @@ -166,7 +181,7 @@ public final class GraphHelper { return vertexWithoutIdentity; } - private AtlasEdge addEdge(AtlasVertex fromVertex, AtlasVertex toVertex, String edgeLabel) { + public AtlasEdge addEdge(AtlasVertex fromVertex, AtlasVertex toVertex, String edgeLabel) { if (LOG.isDebugEnabled()) { LOG.debug("Adding edge for {} -> label {} -> {}", string(fromVertex), edgeLabel, string(toVertex)); } @@ -266,6 +281,26 @@ public final class GraphHelper { return (AtlasEdge) findElement(false, args); } + public static boolean edgeExists(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) { + boolean ret = false; + Iterator<AtlasEdge> edges = getAdjacentEdgesByLabel(inVertex, AtlasEdgeDirection.IN, edgeLabel); + + while (edges != null && edges.hasNext()) { + AtlasEdge edge = edges.next(); + + if (edge.getOutVertex().equals(outVertex)) { + Status edgeState = getStatus(edge); + + if (edgeState == null || edgeState == ACTIVE) { + ret = true; + break; + } + } + } + + return ret; + } + private AtlasElement findElement(boolean isVertexSearch, Object... args) throws EntityNotFoundException { AtlasGraphQuery query = graph.query(); @@ -522,7 +557,7 @@ public final class GraphHelper { } /** - * Adds an additional value to a multi-property. + * Adds an additional value to a multi-property (SET). * * @param vertex * @param propertyName @@ -539,6 +574,23 @@ public final class GraphHelper { } /** + * Adds an additional value to a multi-property (LIST). + * + * @param vertex + * @param propertyName + * @param value + */ + public static void addListProperty(AtlasVertex vertex, String propertyName, Object value) { + String actualPropertyName = GraphHelper.encodePropertyKey(propertyName); + + if (LOG.isDebugEnabled()) { + LOG.debug("Adding property {} = \"{}\" to vertex {}", actualPropertyName, value, string(vertex)); + } + + vertex.addListProperty(actualPropertyName, value); + } + + /** * Remove the specified edge from the graph. * * @param edge @@ -635,6 +687,73 @@ public final class GraphHelper { return result; } + public List<AtlasVertex> getIncludedImpactedVerticesWithReferences(AtlasVertex entityVertex, String relationshipGuid) throws AtlasBaseException { + List<AtlasVertex> ret = new ArrayList<>(); + List<AtlasVertex> impactedVertices = getImpactedVerticesWithReferences(getGuid(entityVertex), relationshipGuid); + + ret.add(entityVertex); + + if (CollectionUtils.isNotEmpty(impactedVertices)) { + ret.addAll(impactedVertices); + } + + return ret; + } + + public List<AtlasVertex> getImpactedVertices(String guid) throws AtlasBaseException { + ScriptEngine scriptEngine = graph.getGremlinScriptEngine(); + Bindings bindings = scriptEngine.createBindings(); + String query = queryProvider.getQuery(TAG_PROPAGATION_IMPACTED_INSTANCES); + List<AtlasVertex> ret = new ArrayList<>(); + + bindings.put("g", graph); + bindings.put("guid", guid); + + try { + Object resultObj = graph.executeGremlinScript(scriptEngine, bindings, query, false); + + if (resultObj instanceof List && CollectionUtils.isNotEmpty((List) resultObj)) { + List<?> results = (List) resultObj; + Object firstElement = results.get(0); + + if (firstElement instanceof AtlasVertex) { + ret = (List<AtlasVertex>) results; + } + } + } catch (ScriptException e) { + throw new AtlasBaseException(AtlasErrorCode.GREMLIN_SCRIPT_EXECUTION_FAILED, e); + } + + return ret; + } + + public List<AtlasVertex> getImpactedVerticesWithReferences(String guid, String relationshipGuid) throws AtlasBaseException { + ScriptEngine scriptEngine = graph.getGremlinScriptEngine(); + Bindings bindings = scriptEngine.createBindings(); + String query = queryProvider.getQuery(TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL); + List<AtlasVertex> ret = new ArrayList<>(); + + bindings.put("g", graph); + bindings.put("guid", guid); + bindings.put("relationshipGuid", relationshipGuid); + + try { + Object resultObj = graph.executeGremlinScript(scriptEngine, bindings, query, false); + + if (resultObj instanceof List && CollectionUtils.isNotEmpty((List) resultObj)) { + List<?> results = (List) resultObj; + Object firstElement = results.get(0); + + if (firstElement instanceof AtlasVertex) { + ret = (List<AtlasVertex>) results; + } + } + } catch (ScriptException e) { + throw new AtlasBaseException(AtlasErrorCode.GREMLIN_SCRIPT_EXECUTION_FAILED, e); + } + + return ret; + } /** * Finds the Vertices that correspond to the given GUIDs. GUIDs @@ -655,13 +774,60 @@ public final class GraphHelper { return attrName; } + public static String getTraitLabel(String traitName) { + return traitName; + } + + public static String getPropagatedEdgeLabel(String classificationName) { + return "propagated:" + classificationName; + } + + public static List<String> getAllTraitNames(AtlasVertex<?, ?> entityVertex) { + ArrayList<String> ret = new ArrayList<>(); + + if (entityVertex != null) { + Collection<String> traitNames = entityVertex.getPropertyValues(TRAIT_NAMES_PROPERTY_KEY, String.class); + + if (CollectionUtils.isNotEmpty(traitNames)) { + ret.addAll(traitNames); + } + + traitNames = entityVertex.getPropertyValues(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, String.class); + + if (CollectionUtils.isNotEmpty(traitNames)) { + ret.addAll(traitNames); + } + } + + return ret; + } + public static List<String> getTraitNames(AtlasVertex<?,?> entityVertex) { - ArrayList<String> traits = new ArrayList<>(); - Collection<String> propertyValues = entityVertex.getPropertyValues(Constants.TRAIT_NAMES_PROPERTY_KEY, String.class); - for(String value : propertyValues) { - traits.add(value); + ArrayList<String> ret = new ArrayList<>(); + + if (entityVertex != null) { + Collection<String> traitNames = entityVertex.getPropertyValues(TRAIT_NAMES_PROPERTY_KEY, String.class); + + if (CollectionUtils.isNotEmpty(traitNames)) { + ret.addAll(traitNames); + } } - return traits; + + return ret; + } + + public static List<String> getPropagatedTraitNames(AtlasVertex<?,?> entityVertex) { + ArrayList<String> ret = new ArrayList<>(); + + if (entityVertex != null) { + Collection<String> traitNames = entityVertex.getPropertyValues(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, String.class); + + if (CollectionUtils.isNotEmpty(traitNames)) { + ret.addAll(traitNames); + } + } + + return ret; } public static List<String> getSuperTypeNames(AtlasVertex<?,?> entityVertex) { @@ -691,6 +857,10 @@ public final class GraphHelper { return getIdFromVertex(getTypeName(vertex), vertex); } + public static String getRelationshipGuid(AtlasElement element) { + return element.getProperty(Constants.RELATIONSHIP_GUID_PROPERTY_KEY, String.class); + } + public static String getGuid(AtlasElement element) { return element.<String>getProperty(Constants.GUID_PROPERTY_KEY, String.class); } @@ -743,6 +913,25 @@ public final class GraphHelper { return element.getProperty(Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class); } + public List<AtlasVertex> getPropagatedEntityVerticesFromClassification(AtlasVertex classificationVertex) { + List<AtlasVertex> ret = new ArrayList<>(); + + if (classificationVertex != null) { + String classificationName = getTypeName(classificationVertex); + Iterator<AtlasEdge> iterator = getIncomingEdgesByLabel(classificationVertex, getPropagatedEdgeLabel(classificationName)); + + while (iterator != null && iterator.hasNext()) { + AtlasEdge propagatedEdge = iterator.next(); + + if (propagatedEdge != null) { + ret.add(propagatedEdge.getOutVertex()); + } + } + } + + return ret; + } + /** * For the given type, finds an unique attribute and checks if there is an existing instance with the same * unique value @@ -881,6 +1070,41 @@ public final class GraphHelper { return Collections.emptyList(); } + public static List<String> getTypeNames(List<AtlasVertex> vertices) { + List<String> ret = new ArrayList<>(); + + if (CollectionUtils.isNotEmpty(vertices)) { + for (AtlasVertex vertex : vertices) { + String entityTypeProperty = vertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class); + + if (entityTypeProperty != null) { + ret.add(getTypeName(vertex)); + } + } + } + + return ret; + } + + public static AtlasVertex getAssociatedEntityVertex(AtlasVertex classificationVertex) { + AtlasVertex ret = null; + + if (classificationVertex != null) { + Iterator<AtlasEdge> iterator = getIncomingEdgesByLabel(classificationVertex, getTypeName(classificationVertex)); + + while (iterator != null && iterator.hasNext()) { + AtlasEdge edge = iterator.next(); + + if (edge != null) { + ret = edge.getOutVertex(); + break; + } + } + } + + return ret; + } + /** * Guid and AtlasVertex combo */ @@ -919,7 +1143,8 @@ public final class GraphHelper { } } - /** + /* + /** * Get the GUIDs and vertices for all composite entities owned/contained by the specified root entity AtlasVertex. * The graph is traversed from the root entity through to the leaf nodes of the containment graph. * @@ -1299,4 +1524,51 @@ public final class GraphHelper { return StringUtils.isNotEmpty(edge.getLabel()) ? edgeLabel.startsWith("r:") : false; } + + public static AtlasObjectId getReferenceObjectId(AtlasEdge edge, AtlasRelationshipEdgeDirection relationshipDirection, + AtlasVertex parentVertex) { + AtlasObjectId ret = null; + + if (relationshipDirection == OUT) { + ret = getAtlasObjectIdForInVertex(edge); + } else if (relationshipDirection == IN) { + ret = getAtlasObjectIdForOutVertex(edge); + } else if (relationshipDirection == BOTH){ + // since relationship direction is BOTH, edge direction can be inward or outward + // compare with parent entity vertex and pick the right reference vertex + if (verticesEquals(parentVertex, edge.getOutVertex())) { + ret = getAtlasObjectIdForInVertex(edge); + } else { + ret = getAtlasObjectIdForOutVertex(edge); + } + } + + return ret; + } + + public static AtlasObjectId getAtlasObjectIdForOutVertex(AtlasEdge edge) { + return new AtlasObjectId(getGuid(edge.getOutVertex()), getTypeName(edge.getOutVertex())); + } + + public static AtlasObjectId getAtlasObjectIdForInVertex(AtlasEdge edge) { + return new AtlasObjectId(getGuid(edge.getInVertex()), getTypeName(edge.getInVertex())); + } + + private static boolean verticesEquals(AtlasVertex vertexA, AtlasVertex vertexB) { + return StringUtils.equals(getGuid(vertexB), getGuid(vertexA)); + } + + public static void removePropagatedTraitNameFromVertex(AtlasVertex entityVertex, String propagatedTraitName) { + List<String> propagatedTraitNames = getPropagatedTraitNames(entityVertex); + + if (CollectionUtils.isNotEmpty(propagatedTraitNames) && propagatedTraitNames.contains(propagatedTraitName)) { + propagatedTraitNames.remove(propagatedTraitName); + + entityVertex.removeProperty(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY); + + for (String pTraitName : propagatedTraitNames) { + GraphHelper.addListProperty(entityVertex, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, pTraitName); + } + } + } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java index 2b6bead..b9945d4 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java @@ -20,13 +20,14 @@ package org.apache.atlas.repository.store.graph.v1; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; -import org.apache.atlas.RequestContextV1; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.listener.EntityChangeListener; +import org.apache.atlas.listener.EntityChangeListenerV2; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasEntityHeader; + import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutations.EntityOperation; import org.apache.atlas.v1.model.instance.Referenceable; @@ -49,22 +50,26 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import static org.apache.atlas.util.AtlasRepositoryConfiguration.isV2EntityNotificationEnabled; + @Component public class AtlasEntityChangeNotifier { private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityChangeNotifier.class); - private final Set<EntityChangeListener> entityChangeListeners; - private final AtlasInstanceConverter instanceConverter; + private final Set<EntityChangeListener> entityChangeListeners; + private final Set<EntityChangeListenerV2> entityChangeListenersV2; + private final AtlasInstanceConverter instanceConverter; @Inject private FullTextMapperV2 fullTextMapperV2; @Inject - public AtlasEntityChangeNotifier(Set<EntityChangeListener> entityChangeListeners, - AtlasInstanceConverter instanceConverter) { - this.entityChangeListeners = entityChangeListeners; - this.instanceConverter = instanceConverter; + public AtlasEntityChangeNotifier(Set<EntityChangeListener> entityChangeListeners, Set<EntityChangeListenerV2> entityChangeListenersV2, + AtlasInstanceConverter instanceConverter) { + this.entityChangeListeners = entityChangeListeners; + this.entityChangeListenersV2 = entityChangeListenersV2; + this.instanceConverter = instanceConverter; } public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException { @@ -89,63 +94,84 @@ public class AtlasEntityChangeNotifier { notifyListeners(deletedEntities, EntityOperation.DELETE, isImport); } - public void onClassificationAddedToEntity(String entityId, List<AtlasClassification> classifications) throws AtlasBaseException { - // Only new classifications need to be used for a partial full text string which can be - // appended to the existing fullText - updateFullTextMapping(entityId, classifications); + public void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException { + if (isV2EntityNotificationEnabled()) { + doFullTextMapping(entity.getGuid()); - Referenceable entity = toReferenceable(entityId); - List<Struct> traits = toStruct(classifications); + for (EntityChangeListenerV2 listener : entityChangeListenersV2) { + listener.onClassificationsAdded(entity, addedClassifications); + } + } else { + updateFullTextMapping(entity.getGuid(), addedClassifications); - if (entity == null || CollectionUtils.isEmpty(traits)) { - return; - } + Referenceable entityRef = toReferenceable(entity.getGuid()); + List<Struct> traits = toStruct(addedClassifications); - for (EntityChangeListener listener : entityChangeListeners) { - try { - listener.onTraitsAdded(entity, traits); - } catch (AtlasException e) { - throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitAdd"); + if (entity == null || CollectionUtils.isEmpty(traits)) { + return; + } + + for (EntityChangeListener listener : entityChangeListeners) { + try { + listener.onTraitsAdded(entityRef, traits); + } catch (AtlasException e) { + throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitAdd"); + } } } } - public void onClassificationDeletedFromEntity(String entityId, List<String> traitNames) throws AtlasBaseException { - // Since the entity has already been modified in the graph, we need to recursively remap the entity - doFullTextMapping(entityId); + public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException { + if (isV2EntityNotificationEnabled()) { + doFullTextMapping(entity.getGuid()); - Referenceable entity = toReferenceable(entityId); + for (EntityChangeListenerV2 listener : entityChangeListenersV2) { + listener.onClassificationsUpdated(entity, updatedClassifications); + } + } else { + doFullTextMapping(entity.getGuid()); - if (entity == null || CollectionUtils.isEmpty(traitNames)) { - return; - } + Referenceable entityRef = toReferenceable(entity.getGuid()); + List<Struct> traits = toStruct(updatedClassifications); - for (EntityChangeListener listener : entityChangeListeners) { - try { - listener.onTraitsDeleted(entity, traitNames); - } catch (AtlasException e) { - throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitDelete"); + if (entityRef == null || CollectionUtils.isEmpty(traits)) { + return; + } + + for (EntityChangeListener listener : entityChangeListeners) { + try { + listener.onTraitsUpdated(entityRef, traits); + } catch (AtlasException e) { + throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitUpdate"); + } } } } - public void onClassificationUpdatedToEntity(String entityId, List<AtlasClassification> classifications) throws AtlasBaseException { - // Since the classification attributes are updated in the graph, we need to recursively remap the entityText - doFullTextMapping(entityId); + public void onClassificationDeletedFromEntity(AtlasEntity entity, List<String> deletedClassificationNames) throws AtlasBaseException { + if (isV2EntityNotificationEnabled()) { + doFullTextMapping(entity.getGuid()); + + for (EntityChangeListenerV2 listener : entityChangeListenersV2) { + listener.onClassificationsDeleted(entity, deletedClassificationNames); + } + } else { + doFullTextMapping(entity.getGuid()); - Referenceable entity = toReferenceable(entityId); - List<Struct> traits = toStruct(classifications); + Referenceable entityRef = toReferenceable(entity.getGuid()); - if (entity == null || CollectionUtils.isEmpty(traits)) { - return; - } + if (entityRef == null || CollectionUtils.isEmpty(deletedClassificationNames)) { + return; + } - for (EntityChangeListener listener : entityChangeListeners) { - try { - listener.onTraitsUpdated(entity, traits); - } catch (AtlasException e) { - throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitUpdate"); + for (EntityChangeListener listener : entityChangeListeners) { + try { + listener.onTraitsDeleted(entityRef, deletedClassificationNames); + } catch (AtlasException e) { + throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitDelete"); + } } + } } @@ -158,6 +184,14 @@ public class AtlasEntityChangeNotifier { return; } + if (isV2EntityNotificationEnabled()) { + notifyV2Listeners(entityHeaders, operation, isImport); + } else { + notifyV1Listeners(entityHeaders, operation, isImport); + } + } + + private void notifyV1Listeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException { List<Referenceable> typedRefInsts = toReferenceables(entityHeaders, operation); for (EntityChangeListener listener : entityChangeListeners) { @@ -180,7 +214,26 @@ public class AtlasEntityChangeNotifier { } } - private List<Referenceable> toReferenceables(List<AtlasEntityHeader> entityHeaders, EntityOperation operation) throws AtlasBaseException { + private void notifyV2Listeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException { + List<AtlasEntity> entities = toAtlasEntities(entityHeaders); + + for (EntityChangeListenerV2 listener : entityChangeListenersV2) { + switch (operation) { + case CREATE: + listener.onEntitiesAdded(entities, isImport); + break; + case UPDATE: + case PARTIAL_UPDATE: + listener.onEntitiesUpdated(entities, isImport); + break; + case DELETE: + listener.onEntitiesDeleted(entities, isImport); + break; + } + } + } + + private List<Referenceable> toReferenceables(List<AtlasEntityHeader> entityHeaders, EntityOperation operation) throws AtlasBaseException { List<Referenceable> ret = new ArrayList<>(entityHeaders.size()); // delete notifications don't need all attributes. Hence the special handling for delete operation @@ -207,7 +260,7 @@ public class AtlasEntityChangeNotifier { return ret; } - private List<Struct> toStruct(List<AtlasClassification> classifications) throws AtlasBaseException { + private List<Struct> toStruct(List<AtlasClassification> classifications) throws AtlasBaseException { List<Struct> ret = null; if (classifications != null) { @@ -223,6 +276,23 @@ public class AtlasEntityChangeNotifier { return ret; } + private List<AtlasEntity> toAtlasEntities(List<AtlasEntityHeader> entityHeaders) throws AtlasBaseException { + List<AtlasEntity> ret = new ArrayList<>(); + + if (CollectionUtils.isNotEmpty(entityHeaders)) { + for (AtlasEntityHeader entityHeader : entityHeaders) { + String entityGuid = entityHeader.getGuid(); + AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid); + + if (entityWithExtInfo != null) { + ret.add(entityWithExtInfo.getEntity()); + } + } + } + + return ret; + } + private void doFullTextMapping(List<AtlasEntityHeader> entityHeaders) { if (CollectionUtils.isEmpty(entityHeaders)) { return; @@ -293,4 +363,4 @@ public class AtlasEntityChangeNotifier { doFullTextMapping(Collections.singletonList(entityHeader)); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java index ca0eeeb..bf417dd 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java @@ -462,54 +462,26 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { validateEntityAssociations(guid, classifications); entityGraphMapper.addClassifications(new EntityMutationContext(), guid, classifications); - - // notify listeners on classification addition - entityChangeNotifier.onClassificationAddedToEntity(guid, classifications); } @Override @GraphTransaction - public void updateClassifications(String guid, List<AtlasClassification> newClassifications) throws AtlasBaseException { + public void updateClassifications(String guid, List<AtlasClassification> classifications) throws AtlasBaseException { if (LOG.isDebugEnabled()) { - LOG.debug("Updating classifications={} for entity={}", newClassifications, guid); + LOG.debug("Updating classifications={} for entity={}", classifications, guid); } if (StringUtils.isEmpty(guid)) { throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid not specified"); } - if (CollectionUtils.isEmpty(newClassifications)) { + if (CollectionUtils.isEmpty(classifications)) { throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified"); } GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); - List<AtlasClassification> updatedClassifications = new ArrayList<>(); - - for (AtlasClassification newClassification : newClassifications) { - String classificationName = newClassification.getTypeName(); - AtlasClassification oldClassification = getClassification(guid, classificationName); - - if (oldClassification == null) { - throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName); - } - validateAndNormalizeForUpdate(newClassification); - - Map<String, Object> newAttrs = newClassification.getAttributes(); - - if (MapUtils.isNotEmpty(newAttrs)) { - for (String attrName : newAttrs.keySet()) { - oldClassification.setAttribute(attrName, newAttrs.get(attrName)); - } - } - - entityGraphMapper.updateClassification(new EntityMutationContext(), guid, oldClassification); - - updatedClassifications.add(oldClassification); - } - - // notify listeners on update to classifications - entityChangeNotifier.onClassificationUpdatedToEntity(guid, updatedClassifications); + entityGraphMapper.updateClassifications(new EntityMutationContext(), guid, classifications); } @Override @@ -533,15 +505,10 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { List<AtlasClassification> classifications = Collections.singletonList(classification); for (String guid : guids) { - // validate if entity, not already associated with classifications validateEntityAssociations(guid, classifications); entityGraphMapper.addClassifications(new EntityMutationContext(), guid, classifications); - - // notify listeners on classification addition - entityChangeNotifier.onClassificationAddedToEntity(guid, classifications); } - } @Override @@ -561,16 +528,13 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); entityGraphMapper.deleteClassifications(guid, classificationNames); - - // notify listeners on classification deletion - entityChangeNotifier.onClassificationDeletedFromEntity(guid, classificationNames); } @Override @GraphTransaction public List<AtlasClassification> getClassifications(String guid) throws AtlasBaseException { if (LOG.isDebugEnabled()) { - LOG.debug("Getting classifications for entities={}", guid); + LOG.debug("Getting classifications for entity={}", guid); } EntityGraphRetriever graphRetriever = new EntityGraphRetriever(typeRegistry); @@ -680,24 +644,6 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { type.getNormalizedValue(classification); } - private void validateAndNormalizeForUpdate(AtlasClassification classification) throws AtlasBaseException { - AtlasClassificationType type = typeRegistry.getClassificationTypeByName(classification.getTypeName()); - - if (type == null) { - throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classification.getTypeName()); - } - - List<String> messages = new ArrayList<>(); - - type.validateValueForUpdate(classification, classification.getTypeName(), messages); - - if (!messages.isEmpty()) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, messages); - } - - type.getNormalizedValueForUpdate(classification); - } - /** * Validate if classification is not already associated with the entities * @@ -734,7 +680,11 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ret = new ArrayList<>(); for (AtlasClassification classification : classifications) { - ret.add(classification.getTypeName()); + String entityGuid = classification.getEntityGuid(); + + if (StringUtils.isEmpty(entityGuid) || StringUtils.equalsIgnoreCase(guid, entityGuid)) { + ret.add(classification.getTypeName()); + } } } http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java index 7389f49..28636d8 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java @@ -21,6 +21,7 @@ import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; +import org.apache.atlas.model.instance.AtlasEntity.Status; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasRelationship; import org.apache.atlas.model.typedef.AtlasRelationshipDef; @@ -30,6 +31,7 @@ import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.RepositoryException; import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; import org.apache.atlas.type.AtlasEntityType; @@ -54,10 +56,16 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; +import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE; import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; +import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.NONE; +import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.BOTH; import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO; import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.TWO_TO_ONE; -import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.BOTH; +import static org.apache.atlas.repository.graph.GraphHelper.getGuid; +import static org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel; +import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags; +import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getIdFromVertex; import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getState; import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getTypeName; @@ -117,8 +125,8 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore { AtlasEdge edge = graphHelper.getEdgeForGUID(guid); String edgeType = AtlasGraphUtilsV1.getTypeName(edge); - AtlasVertex end1Vertex = edge.getInVertex(); - AtlasVertex end2Vertex = edge.getOutVertex(); + AtlasVertex end1Vertex = edge.getOutVertex(); + AtlasVertex end2Vertex = edge.getInVertex(); // update shouldn't change endType if (StringUtils.isNotEmpty(relationship.getTypeName()) && !StringUtils.equalsIgnoreCase(edgeType, relationship.getTypeName())) { @@ -302,6 +310,8 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore { private AtlasRelationship updateRelationship(AtlasEdge relationshipEdge, AtlasRelationship relationship) throws AtlasBaseException { AtlasRelationshipType relationType = typeRegistry.getRelationshipTypeByName(relationship.getTypeName()); + updateTagPropagations(relationshipEdge, relationship.getPropagateTags()); + AtlasGraphUtilsV1.setProperty(relationshipEdge, Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, relationship.getPropagateTags().name()); if (MapUtils.isNotEmpty(relationType.getAllAttributes())) { @@ -318,6 +328,46 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore { return entityRetriever.mapEdgeToAtlasRelationship(relationshipEdge); } + private void updateTagPropagations(AtlasEdge relationshipEdge, PropagateTags tagPropagation) throws AtlasBaseException { + PropagateTags oldTagPropagation = getPropagateTags(relationshipEdge); + PropagateTags newTagPropagation = tagPropagation; + + if (newTagPropagation != oldTagPropagation) { + if (LOG.isDebugEnabled()) { + LOG.debug("Updating tagPropagation property: [ {} -> {} ] for relationship: [{} --> {}]", oldTagPropagation.name(), + newTagPropagation.name(), getTypeName(relationshipEdge.getOutVertex()), getTypeName(relationshipEdge.getInVertex())); + } + + if (oldTagPropagation == NONE) { + entityRetriever.addTagPropagation(relationshipEdge, newTagPropagation); + } else if (oldTagPropagation == ONE_TO_TWO) { + if (newTagPropagation == NONE || newTagPropagation == TWO_TO_ONE) { + entityRetriever.removeTagPropagation(relationshipEdge, oldTagPropagation); + } + + if (newTagPropagation != NONE) { + entityRetriever.addTagPropagation(relationshipEdge, newTagPropagation); + } + } else if (oldTagPropagation == TWO_TO_ONE) { + if (newTagPropagation == NONE || newTagPropagation == ONE_TO_TWO) { + entityRetriever.removeTagPropagation(relationshipEdge, oldTagPropagation); + } + + if (newTagPropagation != NONE) { + entityRetriever.addTagPropagation(relationshipEdge, newTagPropagation); + } + } else if (oldTagPropagation == BOTH) { + if (newTagPropagation == ONE_TO_TWO || newTagPropagation == NONE) { + entityRetriever.removeTagPropagation(relationshipEdge, TWO_TO_ONE); + } + + if (newTagPropagation == TWO_TO_ONE || newTagPropagation == NONE) { + entityRetriever.removeTagPropagation(relationshipEdge, ONE_TO_TWO); + } + } + } + } + private void validateRelationship(AtlasRelationship relationship) throws AtlasBaseException { if (relationship == null) { throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "AtlasRelationship is null"); @@ -462,16 +512,20 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore { } public AtlasEdge getRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, String relationshipType) { - String relationshipLabel = getRelationshipEdgeLabel(fromVertex, toVertex, relationshipType); - AtlasEdge ret = graphHelper.getEdgeForLabel(fromVertex, relationshipLabel); + String relationshipLabel = getRelationshipEdgeLabel(fromVertex, toVertex, relationshipType); + Iterator<AtlasEdge> edgesIterator = getOutGoingEdgesByLabel(fromVertex, relationshipLabel); + AtlasEdge ret = null; - if (ret != null) { - AtlasVertex inVertex = ret.getInVertex(); + while (edgesIterator != null && edgesIterator.hasNext()) { + AtlasEdge edge = edgesIterator.next(); - if (inVertex != null) { - if (!StringUtils.equals(AtlasGraphUtilsV1.getIdFromVertex(inVertex), - AtlasGraphUtilsV1.getIdFromVertex(toVertex))) { - ret = null; + if (edge != null) { + Status status = graphHelper.getStatus(edge); + + if ((status == null || status == ACTIVE) && + StringUtils.equals(getIdFromVertex(edge.getInVertex()), getIdFromVertex(toVertex))) { + ret = edge; + break; } } } @@ -499,11 +553,15 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore { return ret; } - private AtlasEdge createRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) throws RepositoryException { + private AtlasEdge createRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) throws RepositoryException, AtlasBaseException { String relationshipLabel = getRelationshipEdgeLabel(fromVertex, toVertex, relationship.getTypeName()); PropagateTags tagPropagation = getRelationshipTagPropagation(fromVertex, toVertex, relationship); AtlasEdge ret = graphHelper.getOrCreateEdge(fromVertex, toVertex, relationshipLabel); + if (LOG.isDebugEnabled()) { + LOG.debug("Created relationship edge from [{}] --> [{}] using edge label: [{}]", getTypeName(fromVertex), getTypeName(toVertex), relationshipLabel); + } + // map additional properties to relationship edge if (ret != null) { final String guid = UUID.randomUUID().toString(); @@ -512,6 +570,9 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore { AtlasGraphUtilsV1.setProperty(ret, Constants.RELATIONSHIP_GUID_PROPERTY_KEY, guid); AtlasGraphUtilsV1.setProperty(ret, Constants.VERSION_PROPERTY_KEY, getRelationshipVersion(relationship)); AtlasGraphUtilsV1.setProperty(ret, Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, tagPropagation.name()); + + // propagate tags + entityRetriever.addTagPropagation(ret, tagPropagation); } return ret; @@ -596,7 +657,7 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore { */ private boolean vertexHasRelationshipWithType(AtlasVertex vertex, String relationshipTypeName) { String relationshipEdgeLabel = getRelationshipEdgeLabel(getTypeName(vertex), relationshipTypeName); - Iterator<AtlasEdge> iter = graphHelper.getAdjacentEdgesByLabel(vertex, BOTH, relationshipEdgeLabel); + Iterator<AtlasEdge> iter = graphHelper.getAdjacentEdgesByLabel(vertex, AtlasEdgeDirection.BOTH, relationshipEdgeLabel); return (iter != null) ? iter.hasNext() : false; } http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java index 779bc38..0224bf0 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java @@ -25,6 +25,7 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasRelatedObjectId; @@ -36,6 +37,7 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality; import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.RepositoryException; +import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasGraph; @@ -51,6 +53,7 @@ import org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdg import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.atlas.util.AtlasGremlinQueryProvider; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.IteratorUtils; import org.apache.commons.collections.MapUtils; @@ -63,14 +66,24 @@ import javax.inject.Inject; import java.util.*; import java.util.stream.Collectors; +import static org.apache.atlas.model.TypeCategory.CLASSIFICATION; import static org.apache.atlas.model.instance.AtlasRelatedObjectId.KEY_RELATIONSHIP_ATTRIBUTES; import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.CREATE; import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE; import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE; import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE; import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET; +import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY; +import static org.apache.atlas.repository.graph.GraphHelper.addListProperty; +import static org.apache.atlas.repository.graph.GraphHelper.getIncomingEdgesByLabel; +import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedTraitNames; +import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEdgeLabel; +import static org.apache.atlas.repository.graph.GraphHelper.getTraitLabel; +import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames; import static org.apache.atlas.repository.graph.GraphHelper.getTypeName; +import static org.apache.atlas.repository.graph.GraphHelper.getTypeNames; import static org.apache.atlas.repository.graph.GraphHelper.isRelationshipEdge; import static org.apache.atlas.repository.graph.GraphHelper.string; import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getIdFromVertex; @@ -81,21 +94,26 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation public class EntityGraphMapper { private static final Logger LOG = LoggerFactory.getLogger(EntityGraphMapper.class); - private final GraphHelper graphHelper = GraphHelper.getInstance(); - private final AtlasGraph graph; - private final DeleteHandlerV1 deleteHandler; - private final AtlasTypeRegistry typeRegistry; - private final EntityGraphRetriever entityRetriever; - private final AtlasRelationshipStore relationshipStore; + private final GraphHelper graphHelper = GraphHelper.getInstance(); + private final AtlasGraph graph; + private final DeleteHandlerV1 deleteHandler; + private final AtlasTypeRegistry typeRegistry; + private final AtlasRelationshipStore relationshipStore; + private final AtlasEntityChangeNotifier entityChangeNotifier; + private final AtlasInstanceConverter instanceConverter; + private final EntityGraphRetriever entityRetriever; @Inject public EntityGraphMapper(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, - AtlasRelationshipStore relationshipStore) { - this.deleteHandler = deleteHandler; - this.typeRegistry = typeRegistry; - this.entityRetriever = new EntityGraphRetriever(typeRegistry); - this.graph = atlasGraph; - this.relationshipStore = relationshipStore; + AtlasRelationshipStore relationshipStore, AtlasEntityChangeNotifier entityChangeNotifier, + AtlasInstanceConverter instanceConverter) { + this.deleteHandler = deleteHandler; + this.typeRegistry = typeRegistry; + this.graph = atlasGraph; + this.relationshipStore = relationshipStore; + this.entityChangeNotifier = entityChangeNotifier; + this.instanceConverter = instanceConverter; + this.entityRetriever = new EntityGraphRetriever(typeRegistry); } public AtlasVertex createVertex(AtlasEntity entity) { @@ -241,6 +259,8 @@ public class EntityGraphMapper { AtlasVertex ret = createStructVertex(classification); AtlasGraphUtilsV1.addProperty(ret, Constants.SUPER_TYPES_PROPERTY_KEY, classificationType.getAllSuperTypes()); + AtlasGraphUtilsV1.setProperty(ret, Constants.CLASSIFICATION_ENTITY_GUID, classification.getEntityGuid()); + AtlasGraphUtilsV1.setProperty(ret, Constants.CLASSIFICATION_PROPAGATE_KEY, classification.isPropagate()); return ret; } @@ -903,7 +923,7 @@ public class EntityGraphMapper { private void updateModificationMetadata(AtlasVertex vertex) { AtlasGraphUtilsV1.setProperty(vertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime()); - GraphHelper.setProperty(vertex, Constants.MODIFIED_BY_KEY, RequestContextV1.get().getUser()); + AtlasGraphUtilsV1.setProperty(vertex, Constants.MODIFIED_BY_KEY, RequestContextV1.get().getUser()); } private Long getEntityVersion(AtlasEntity entity) { @@ -1285,80 +1305,299 @@ public class EntityGraphMapper { } } - public void addClassifications(final EntityMutationContext context, String guid, List<AtlasClassification> classifications) - throws AtlasBaseException { + public void addClassifications(final EntityMutationContext context, String guid, List<AtlasClassification> classifications) throws AtlasBaseException { + if (CollectionUtils.isNotEmpty(classifications)) { + AtlasVertex entityVertex = AtlasGraphUtilsV1.findByGuid(guid); - if ( CollectionUtils.isNotEmpty(classifications)) { - - AtlasVertex instanceVertex = AtlasGraphUtilsV1.findByGuid(guid); - if (instanceVertex == null) { + if (entityVertex == null) { throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); } - String entityTypeName = AtlasGraphUtilsV1.getTypeName(instanceVertex); - - final AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName); + String entityTypeName = AtlasGraphUtilsV1.getTypeName(entityVertex); + final AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName); + List<AtlasVertex> propagatedEntityVertices = null; + List<AtlasClassification> propagagedClassifications = null; for (AtlasClassification classification : classifications) { + String classificationName = classification.getTypeName(); + boolean propagateTags = classification.isPropagate(); + + // set associated entity id to classification + classification.setEntityGuid(guid); + if (LOG.isDebugEnabled()) { - LOG.debug("mapping classification {}", classification); + LOG.debug("Adding classification [{}] to [{}] using edge label: [{}]", classificationName, entityTypeName, getTraitLabel(classificationName)); } - GraphHelper.addProperty(instanceVertex, Constants.TRAIT_NAMES_PROPERTY_KEY, classification.getTypeName()); + GraphHelper.addProperty(entityVertex, TRAIT_NAMES_PROPERTY_KEY, classificationName); + // add a new AtlasVertex for the struct or trait instance AtlasVertex classificationVertex = createClassificationVertex(classification); + if (LOG.isDebugEnabled()) { - LOG.debug("created vertex {} for trait {}", string(classificationVertex), classification.getTypeName()); + LOG.debug("created vertex {} for trait {}", string(classificationVertex), classificationName); } // add the attributes for the trait instance - mapClassification(EntityOperation.CREATE, context, classification, entityType, instanceVertex, classificationVertex); + mapClassification(EntityOperation.CREATE, context, classification, entityType, entityVertex, classificationVertex); + + if (propagateTags) { + // compute propagatedEntityVertices only once + if (propagatedEntityVertices == null) { + propagatedEntityVertices = graphHelper.getImpactedVertices(guid); + } + + if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Propagating tag: [{}][{}] to {}", classificationName, entityTypeName, getTypeNames(propagatedEntityVertices)); + } + + if (propagagedClassifications == null) { + propagagedClassifications = new ArrayList<>(); + } + + propagagedClassifications.add(classification); + + addTagPropagation(classificationVertex, propagatedEntityVertices); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug(" --> Not propagating classification: [{}][{}] - no entities found to propagate to.", getTypeName(classificationVertex), entityTypeName); + } + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug(" --> Not propagating classification: [{}][{}] - propagation is disabled.", getTypeName(classificationVertex), entityTypeName); + } + } + } + + // notify listeners on classification addition + List<AtlasVertex> notificationVertices = new ArrayList<AtlasVertex>() {{ add(entityVertex); }}; + + if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) { + notificationVertices.addAll(propagatedEntityVertices); + } + + for (AtlasVertex vertex : notificationVertices) { + String entityGuid = GraphHelper.getGuid(vertex); + AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid); + AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null; + List<AtlasClassification> addedClassifications = StringUtils.equals(entityGuid, guid) ? classifications : Collections.emptyList(); + + entityChangeNotifier.onClassificationAddedToEntity(entity, addedClassifications); } } } - public void updateClassification(final EntityMutationContext context, String guid, AtlasClassification classification) - throws AtlasBaseException { + public void deleteClassifications(String guid, List<String> classificationNames) throws AtlasBaseException { + if (CollectionUtils.isNotEmpty(classificationNames)) { + AtlasVertex entityVertex = AtlasGraphUtilsV1.findByGuid(guid); - AtlasVertex instanceVertex = AtlasGraphUtilsV1.findByGuid(guid); + if (entityVertex == null) { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); + } - if (instanceVertex == null) { - throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); - } + List<String> traitNames = getTraitNames(entityVertex); - String entityTypeName = AtlasGraphUtilsV1.getTypeName(instanceVertex); + validateClassificationExists(traitNames, classificationNames); - final AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName); + Set<String> impactedEntities = new HashSet<String>() {{ add(guid); }}; - if (LOG.isDebugEnabled()) { - LOG.debug("Updating classification {} for entity {}", classification, guid); + for (String classificationName : classificationNames) { + AtlasEdge classificationEdge = graphHelper.getEdgeForLabel(entityVertex, getTraitLabel(classificationName)); + AtlasVertex classificationVertex = classificationEdge.getInVertex(); + + // remove classification from propagated entity vertices + boolean propagationEnabled = entityRetriever.isPropagationEnabled(classificationVertex); + + if (propagationEnabled) { + List<AtlasVertex> impactedEntityVertices = graphHelper.getPropagatedEntityVerticesFromClassification(classificationVertex); + + if (CollectionUtils.isNotEmpty(impactedEntityVertices)) { + removeTagPropagation(classificationVertex); + + for (AtlasVertex impactedEntityVertex : impactedEntityVertices) { + impactedEntities.add(GraphHelper.getGuid(impactedEntityVertex)); + } + } + } + + // remove classification from associated entity vertex + if (LOG.isDebugEnabled()) { + LOG.debug("Removing classification: [{}] from: [{}][{}] with edge label: [{}]", classificationName, getTypeName(entityVertex), guid, getTraitLabel(classificationName)); + } + + deleteHandler.deleteEdgeReference(classificationEdge, CLASSIFICATION, false, true, entityVertex); + + traitNames.remove(classificationName); + } + + updateTraitNamesProperty(entityVertex, traitNames); + + updateModificationMetadata(entityVertex); + + for (String entityGuid : impactedEntities) { + AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid); + AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null; + List<String> deletedClassificationNames = StringUtils.equals(entityGuid, guid) ? classificationNames : Collections.emptyList(); + + entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassificationNames); + } } + } + + public void updateClassifications(EntityMutationContext context, String guid, List<AtlasClassification> classifications) throws AtlasBaseException { + if (CollectionUtils.isNotEmpty(classifications)) { + AtlasVertex entityVertex = AtlasGraphUtilsV1.findByGuid(guid); + + if (entityVertex == null) { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); + } + + String entityTypeName = AtlasGraphUtilsV1.getTypeName(entityVertex); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName); + List<AtlasClassification> updatedClassifications = new ArrayList<>(); + List<AtlasVertex> propagatedEntityVertices = new ArrayList<>(); + + for (AtlasClassification classification : classifications) { + String classificationName = classification.getTypeName(); + String classificationEntityGuid = classification.getEntityGuid(); + + if (StringUtils.isNotEmpty(classificationEntityGuid) && StringUtils.equalsIgnoreCase(guid, classificationEntityGuid)) { + throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_UPDATE_FROM_PROPAGATED_ENTITY, classificationName); + } + + String relationshipLabel = getTraitLabel(entityTypeName, classificationName); + AtlasEdge classificationEdge = graphHelper.getEdgeForLabel(entityVertex, relationshipLabel); + + if (classificationEdge == null) { + throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY, classificationName); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Updating classification {} for entity {}", classification, guid); + } + + AtlasVertex classificationVertex = classificationEdge.getInVertex(); + AtlasClassification currentClassification = entityRetriever.toAtlasClassification(classificationVertex); + + validateAndNormalizeForUpdate(classification); + + Map<String, Object> classificationAttributes = classification.getAttributes(); + + if (MapUtils.isNotEmpty(classificationAttributes)) { + for (String attributeName : classificationAttributes.keySet()) { + currentClassification.setAttribute(attributeName, classificationAttributes.get(attributeName)); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("updating vertex {} for trait {}", string(classificationVertex), classificationName); + } + + mapClassification(EntityOperation.UPDATE, context, classification, entityType, entityVertex, classificationVertex); + + // handle update of 'propagate' flag + boolean currentTagPropagation = currentClassification.isPropagate(); + boolean updatedTagPropagation = classification.isPropagate(); + + // compute propagatedEntityVertices once and use it for subsequent iterations and notifications + if (CollectionUtils.isEmpty(propagatedEntityVertices)) { + propagatedEntityVertices = (currentTagPropagation) ? graphHelper.getPropagatedEntityVerticesFromClassification(classificationVertex) : + graphHelper.getImpactedVertices(guid); + } + + if (currentTagPropagation != updatedTagPropagation) { + if (updatedTagPropagation) { + addTagPropagation(classificationVertex, propagatedEntityVertices); + } else { + removeTagPropagation(classificationVertex); + } - // get the classification vertex from entity - String relationshipLabel = GraphHelper.getTraitLabel(entityTypeName, classification.getTypeName()); - AtlasEdge classificationEdge = graphHelper.getEdgeForLabel(instanceVertex, relationshipLabel); + AtlasGraphUtilsV1.setProperty(classificationVertex, Constants.CLASSIFICATION_PROPAGATE_KEY, updatedTagPropagation); + } + + updatedClassifications.add(currentClassification); + } - if (classificationEdge == null) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "classificationEdge is null for label: " + relationshipLabel); + // notify listeners on classification update + List<AtlasVertex> notificationVertices = new ArrayList<AtlasVertex>() {{ add(entityVertex); }}; + + if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) { + notificationVertices.addAll(propagatedEntityVertices); + } + + for (AtlasVertex vertex : notificationVertices) { + String entityGuid = GraphHelper.getGuid(vertex); + AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid); + AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null; + List<AtlasClassification> updatedClassificationList = StringUtils.equals(entityGuid, guid) ? updatedClassifications : Collections.emptyList(); + + entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassificationList); + } } + } - AtlasVertex classificationVertex = classificationEdge.getInVertex(); + private void addTagPropagation(AtlasVertex classificationVertex, List<AtlasVertex> propagatedEntityVertices) { + if (CollectionUtils.isNotEmpty(propagatedEntityVertices) && classificationVertex != null) { + String classificationName = getTypeName(classificationVertex); + AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classificationName); - if (LOG.isDebugEnabled()) { - LOG.debug("updating vertex {} for trait {}", string(classificationVertex), classification.getTypeName()); + for (AtlasVertex propagatedEntityVertex : propagatedEntityVertices) { + String entityTypeName = getTypeName(propagatedEntityVertex); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName); + + if (classificationType.canApplyToEntityType(entityType)) { + if (LOG.isDebugEnabled()) { + LOG.debug(" --> Adding propagated classification: [{}] to {} ({}) using edge label: [{}]", classificationName, getTypeName(propagatedEntityVertex), + GraphHelper.getGuid(propagatedEntityVertex), getPropagatedEdgeLabel(classificationName)); + } + + graphHelper.addEdge(propagatedEntityVertex, classificationVertex, getPropagatedEdgeLabel(classificationName)); + + addListProperty(propagatedEntityVertex, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, classificationName); + } + } } + } - mapClassification(EntityOperation.UPDATE, context, classification, entityType, instanceVertex, classificationVertex); + private void removeTagPropagation(AtlasVertex classificationVertex) throws AtlasBaseException { + if (classificationVertex != null) { + String classificationName = getTypeName(classificationVertex); + Iterator<AtlasEdge> iterator = getIncomingEdgesByLabel(classificationVertex, getPropagatedEdgeLabel(classificationName)); + + // remove classification from propagated entity vertices + while (iterator != null && iterator.hasNext()) { + AtlasEdge propagatedEdge = iterator.next(); + + if (propagatedEdge != null) { + AtlasVertex propagatedEntityVertex = propagatedEdge.getOutVertex(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Removing propagated classification: [{}] from: [{}][{}] with edge label: [{}]", classificationName, + getTypeName(propagatedEntityVertex), GraphHelper.getGuid(propagatedEntityVertex), getPropagatedEdgeLabel(classificationName)); + } + + removePropagatedTraitName(propagatedEntityVertex, classificationName); + + deleteHandler.deleteEdge(propagatedEdge, true); + + updateModificationMetadata(propagatedEntityVertex); + } + } + } } - private AtlasEdge mapClassification(EntityOperation operation, final EntityMutationContext context, AtlasClassification classification, AtlasEntityType entityType, AtlasVertex parentInstanceVertex, AtlasVertex traitInstanceVertex) - throws AtlasBaseException { + private AtlasEdge mapClassification(EntityOperation operation, final EntityMutationContext context, AtlasClassification classification, + AtlasEntityType entityType, AtlasVertex parentInstanceVertex, AtlasVertex traitInstanceVertex) + throws AtlasBaseException { // map all the attributes to this newly created AtlasVertex mapAttributes(classification, traitInstanceVertex, operation, context); // add an edge to the newly created AtlasVertex from the parent - String relationshipLabel = GraphHelper.getTraitLabel(entityType.getTypeName(), classification.getTypeName()); + String relationshipLabel = getTraitLabel(entityType.getTypeName(), classification.getTypeName()); + try { return graphHelper.getOrCreateEdge(parentInstanceVertex, traitInstanceVertex, relationshipLabel); } catch (RepositoryException e) { @@ -1373,54 +1612,40 @@ public class EntityGraphMapper { throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); } - List<String> traitNames = GraphHelper.getTraitNames(instanceVertex); + List<String> traitNames = getTraitNames(instanceVertex); deleteClassifications(guid, traitNames); } - public void deleteClassifications(String guid, List<String> classificationNames) throws AtlasBaseException { - - AtlasVertex instanceVertex = AtlasGraphUtilsV1.findByGuid(guid); - if (instanceVertex == null) { - throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); - } + private void removePropagatedTraitName(AtlasVertex entityVertex, String classificationName) { + if (entityVertex != null && StringUtils.isNotEmpty(classificationName)) { + List<String> propagatedTraitNames = getPropagatedTraitNames(entityVertex); - List<String> traitNames = GraphHelper.getTraitNames(instanceVertex); + propagatedTraitNames.remove(classificationName); - validateClassificationExists(traitNames, classificationNames); + entityVertex.removeProperty(PROPAGATED_TRAIT_NAMES_PROPERTY_KEY); - for (String classificationName : classificationNames) { - try { - final String entityTypeName = getTypeName(instanceVertex); - String relationshipLabel = GraphHelper.getTraitLabel(entityTypeName, classificationName); - AtlasEdge edge = graphHelper.getEdgeForLabel(instanceVertex, relationshipLabel); - if (edge != null) { - deleteHandler.deleteEdgeReference(edge, TypeCategory.CLASSIFICATION, false, true, instanceVertex); - - // update the traits in entity once trait removal is successful - traitNames.remove(classificationName); - - } - } catch (Exception e) { - throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e); + for (String propagatedTraitName : propagatedTraitNames) { + addListProperty(entityVertex, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, propagatedTraitName); } } + } - // remove the key - instanceVertex.removeProperty(Constants.TRAIT_NAMES_PROPERTY_KEY); + private void updateTraitNamesProperty(AtlasVertex entityVertex, List<String> traitNames) { + if (entityVertex != null) { + entityVertex.removeProperty(TRAIT_NAMES_PROPERTY_KEY); - // add it back again - for (String traitName : traitNames) { - GraphHelper.addProperty(instanceVertex, Constants.TRAIT_NAMES_PROPERTY_KEY, traitName); + for (String traitName : traitNames) { + GraphHelper.addProperty(entityVertex, TRAIT_NAMES_PROPERTY_KEY, traitName); + } } - updateModificationMetadata(instanceVertex); } private void validateClassificationExists(List<String> existingClassifications, List<String> suppliedClassifications) throws AtlasBaseException { Set<String> existingNames = new HashSet<>(existingClassifications); for (String classificationName : suppliedClassifications) { if (!existingNames.contains(classificationName)) { - throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName); + throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY, classificationName); } } } @@ -1489,4 +1714,22 @@ public class EntityGraphMapper { return currentEntityId; } + + public void validateAndNormalizeForUpdate(AtlasClassification classification) throws AtlasBaseException { + AtlasClassificationType type = typeRegistry.getClassificationTypeByName(classification.getTypeName()); + + if (type == null) { + throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classification.getTypeName()); + } + + List<String> messages = new ArrayList<>(); + + type.validateValueForUpdate(classification, classification.getTypeName(), messages); + + if (!messages.isEmpty()) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, messages); + } + + type.getNormalizedValueForUpdate(classification); + } }
