Repository: atlas Updated Branches: refs/heads/master f622751db -> 8063de4ad
ATLAS-2558: Reevaluate propagated tags based on blocked classifications in relationship edge Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/8063de4a Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/8063de4a Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/8063de4a Branch: refs/heads/master Commit: 8063de4ad4d5cc70d15a648080bdc9e5004a1446 Parents: f622751 Author: Sarath Subramanian <ssubraman...@hortonworks.com> Authored: Sun Apr 15 23:33:38 2018 -0700 Committer: Sarath Subramanian <ssubraman...@hortonworks.com> Committed: Sun Apr 15 23:33:38 2018 -0700 ---------------------------------------------------------------------- .../atlas/repository/graph/GraphHelper.java | 116 ++++++++++++++ .../graph/v1/AtlasRelationshipStoreV1.java | 157 +++++++++++++------ .../store/graph/v1/DeleteHandlerV1.java | 91 +++++++++-- .../store/graph/v1/EntityGraphMapper.java | 51 +----- .../store/graph/v1/EntityGraphRetriever.java | 63 +++----- .../atlas/util/AtlasGremlin3QueryProvider.java | 6 + .../atlas/util/AtlasGremlinQueryProvider.java | 3 +- 7 files changed, 339 insertions(+), 148 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/8063de4a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index ae22e7a..48db657 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -79,6 +79,7 @@ import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_STATE_PR import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID; import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL; import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_NAME_KEY; import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_PROPAGATE_KEY; import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY; import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH; @@ -86,6 +87,7 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT; import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES; import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL; +import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS; /** * Utility class for graph operations. @@ -479,6 +481,22 @@ public final class GraphHelper { return ret; } + public static List<AtlasVertex> getAllPropagatedEntityVertices(AtlasVertex classificationVertex) { + List<AtlasVertex> ret = new ArrayList<>(); + + if (classificationVertex != null) { + List<AtlasEdge> edges = getPropagatedEdges(classificationVertex); + + if (CollectionUtils.isNotEmpty(edges)) { + for (AtlasEdge edge : edges) { + ret.add(edge.getOutVertex()); + } + } + } + + return ret; + } + public static Iterator<AtlasEdge> getIncomingEdgesByLabel(AtlasVertex instanceVertex, String edgeLabel) { return getAdjacentEdgesByLabel(instanceVertex, AtlasEdgeDirection.IN, edgeLabel); } @@ -849,6 +867,53 @@ public final class GraphHelper { return ret; } + public List<AtlasVertex> getPropagatedEntityVertices(AtlasVertex classificationVertex) throws AtlasBaseException { + List<AtlasVertex> ret = new ArrayList<>(); + + if (classificationVertex != null) { + String entityGuid = getClassificationEntityGuid(classificationVertex); + String classificationId = classificationVertex.getIdForDisplay(); + List<AtlasVertex> impactedEntityVertices = getAllPropagatedEntityVertices(classificationVertex); + List<AtlasVertex> impactedEntityVerticesWithRestrictions = getImpactedVerticesWithRestrictions(entityGuid, classificationId); + + if (impactedEntityVertices.size() > impactedEntityVerticesWithRestrictions.size()) { + ret = (List<AtlasVertex>) CollectionUtils.subtract(impactedEntityVertices, impactedEntityVerticesWithRestrictions); + } else { + ret = (List<AtlasVertex>) CollectionUtils.subtract(impactedEntityVerticesWithRestrictions, impactedEntityVertices); + } + } + + return ret; + } + + public List<AtlasVertex> getImpactedVerticesWithRestrictions(String guid, String classificationId) throws AtlasBaseException { + ScriptEngine scriptEngine = graph.getGremlinScriptEngine(); + Bindings bindings = scriptEngine.createBindings(); + String query = queryProvider.getQuery(TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS); + List<AtlasVertex> ret = new ArrayList<>(); + + bindings.put("g", graph); + bindings.put("guid", guid); + bindings.put("classificationId", classificationId); + + try { + Object resultObj = graph.executeGremlinScript(scriptEngine, bindings, query, false); + + if (resultObj instanceof List && CollectionUtils.isNotEmpty((List) resultObj)) { + List<?> results = (List) resultObj; + Object firstElement = results.get(0); + + if (firstElement instanceof AtlasVertex) { + ret = (List<AtlasVertex>) results; + } + } + } catch (ScriptException e) { + throw new AtlasBaseException(AtlasErrorCode.GREMLIN_SCRIPT_EXECUTION_FAILED, e); + } + + return ret; + } + public List<AtlasVertex> getImpactedVerticesWithReferences(String guid, String relationshipGuid) throws AtlasBaseException { ScriptEngine scriptEngine = graph.getGremlinScriptEngine(); Bindings bindings = scriptEngine.createBindings(); @@ -936,6 +1001,49 @@ public final class GraphHelper { return ret; } + public static List<AtlasVertex> getClassificationVertices(AtlasEdge edge) { + List<AtlasVertex> ret = new ArrayList<>(); + + if (edge != null) { + PropagateTags propagateTags = getPropagateTags(edge); + AtlasVertex outVertex = edge.getOutVertex(); + AtlasVertex inVertex = edge.getInVertex(); + + if (propagateTags == PropagateTags.ONE_TO_TWO || propagateTags == PropagateTags.BOTH) { + ret.addAll(getPropagationEnabledClassificationVertices(outVertex)); + } + + if (propagateTags == PropagateTags.TWO_TO_ONE || propagateTags == PropagateTags.BOTH) { + ret.addAll(getPropagationEnabledClassificationVertices(inVertex)); + } + } + + return ret; + } + + public static List<AtlasVertex> getPropagationEnabledClassificationVertices(AtlasVertex entityVertex) { + List<AtlasVertex> ret = new ArrayList<>(); + Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL).edges(); + + if (edges != null) { + Iterator<AtlasEdge> iterator = edges.iterator(); + + while (iterator.hasNext()) { + AtlasEdge edge = iterator.next(); + + if (edge != null) { + AtlasVertex classificationVertex = edge.getInVertex(); + + if (isPropagationEnabled(classificationVertex)) { + ret.add(classificationVertex); + } + } + } + } + + return ret; + } + public static List<AtlasEdge> getClassificationEdges(AtlasVertex entityVertex) { return getClassificationEdges(entityVertex, false); } @@ -1033,6 +1141,14 @@ public final class GraphHelper { return (getState(element) == Id.EntityState.DELETED) ? AtlasRelationship.Status.DELETED : AtlasRelationship.Status.ACTIVE; } + public static String getClassificationName(AtlasVertex classificationVertex) { + return AtlasGraphUtilsV1.getProperty(classificationVertex, CLASSIFICATION_VERTEX_NAME_KEY, String.class); + } + + public static String getClassificationEntityGuid(AtlasVertex classificationVertex) { + return AtlasGraphUtilsV1.getProperty(classificationVertex, CLASSIFICATION_ENTITY_GUID, String.class); + } + public static AtlasClassification.PropagationState getClassificationEdgeState(AtlasEdge edge) { AtlasClassification.PropagationState ret = null; http://git-wip-us.apache.org/repos/asf/atlas/blob/8063de4a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java index cc59c36..70877f3 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java @@ -41,6 +41,7 @@ import org.apache.atlas.type.AtlasRelationshipType; import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -50,6 +51,7 @@ import org.springframework.stereotype.Component; import javax.inject.Inject; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -60,12 +62,12 @@ import java.util.UUID; import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE; import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; -import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.NONE; -import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.BOTH; import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO; import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.TWO_TO_ONE; -import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID; -import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_NAME_KEY; +import static org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds; +import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid; +import static org.apache.atlas.repository.graph.GraphHelper.getClassificationName; +import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertices; import static org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel; import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags; import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getIdFromVertex; @@ -334,34 +336,75 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore { private void handleBlockedClassifications(AtlasEdge edge, List<AtlasClassification> blockedPropagatedClassifications) throws AtlasBaseException { if (blockedPropagatedClassifications != null) { - List<AtlasVertex> propagatedClassificationVertices = blockedPropagatedClassifications.isEmpty() ? null : entityRetriever.getClassificationVertices(edge); - List<String> classificationIds = new ArrayList<>(); + List<AtlasVertex> propagatedClassificationVertices = getClassificationVertices(edge); + List<String> currentClassificationIds = getBlockedClassificationIds(edge); + List<AtlasVertex> currentBlockedPropagatedClassificationVertices = getBlockedClassificationVertices(propagatedClassificationVertices, currentClassificationIds); + List<AtlasVertex> updatedBlockedPropagatedClassificationVertices = new ArrayList<>(); + List<String> updatedClassificationIds = new ArrayList<>(); for (AtlasClassification classification : blockedPropagatedClassifications) { - String classificationId = validateBlockedPropagatedClassification(propagatedClassificationVertices, classification); + AtlasVertex classificationVertex = validateBlockedPropagatedClassification(propagatedClassificationVertices, classification); // ignore invalid blocked propagated classification - if (classificationId == null) { + if (classificationVertex == null) { continue; } - classificationIds.add(classificationId); + updatedBlockedPropagatedClassificationVertices.add(classificationVertex); + + String classificationId = classificationVertex.getIdForDisplay(); + + updatedClassificationIds.add(classificationId); + } + + addToBlockedClassificationIds(edge, updatedClassificationIds); + + // remove propagated tag for added entry + List<AtlasVertex> addedBlockedClassifications = (List<AtlasVertex>) CollectionUtils.subtract(updatedBlockedPropagatedClassificationVertices, currentBlockedPropagatedClassificationVertices); + + for (AtlasVertex classificationVertex : addedBlockedClassifications) { + List<AtlasVertex> removePropagationFromVertices = graphHelper.getPropagatedEntityVertices(classificationVertex); + + deleteHandler.removeTagPropagation(classificationVertex, removePropagationFromVertices); + } + + // add propagated tag for removed entry + List<AtlasVertex> removedBlockedClassifications = (List<AtlasVertex>) CollectionUtils.subtract(currentBlockedPropagatedClassificationVertices, updatedBlockedPropagatedClassificationVertices); + + for (AtlasVertex classificationVertex : removedBlockedClassifications) { + List<AtlasVertex> addPropagationToVertices = graphHelper.getPropagatedEntityVertices(classificationVertex); + + deleteHandler.addTagPropagation(classificationVertex, addPropagationToVertices); } + } + } + + private List<AtlasVertex> getBlockedClassificationVertices(List<AtlasVertex> classificationVertices, List<String> blockedClassificationIds) { + List<AtlasVertex> ret = new ArrayList<>(); - addToBlockedClassificationIds(edge, classificationIds); + if (CollectionUtils.isNotEmpty(blockedClassificationIds)) { + for (AtlasVertex classificationVertex : classificationVertices) { + String classificationId = classificationVertex.getIdForDisplay(); + + if (blockedClassificationIds.contains(classificationId)) { + ret.add(classificationVertex); + } + } } + + return ret; } // propagated classifications should contain blocked propagated classification - private String validateBlockedPropagatedClassification(List<AtlasVertex> classificationVertices, AtlasClassification classification) throws AtlasBaseException { - String ret = null; + private AtlasVertex validateBlockedPropagatedClassification(List<AtlasVertex> classificationVertices, AtlasClassification classification) { + AtlasVertex ret = null; for (AtlasVertex vertex : classificationVertices) { - String classificationName = AtlasGraphUtilsV1.getProperty(vertex, CLASSIFICATION_VERTEX_NAME_KEY, String.class); - String entityGuid = AtlasGraphUtilsV1.getProperty(vertex, CLASSIFICATION_ENTITY_GUID, String.class); + String classificationName = getClassificationName(vertex); + String entityGuid = getClassificationEntityGuid(vertex); if (classificationName.equals(classification.getTypeName()) && entityGuid.equals(classification.getEntityGuid())) { - ret = vertex.getIdForDisplay(); + ret = vertex; break; } } @@ -379,46 +422,72 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore { } } - private void updateTagPropagations(AtlasEdge relationshipEdge, PropagateTags tagPropagation) throws AtlasBaseException { - PropagateTags oldTagPropagation = getPropagateTags(relationshipEdge); + private void updateTagPropagations(AtlasEdge edge, PropagateTags tagPropagation) throws AtlasBaseException { + PropagateTags oldTagPropagation = getPropagateTags(edge); PropagateTags newTagPropagation = tagPropagation; if (newTagPropagation != oldTagPropagation) { - if (LOG.isDebugEnabled()) { - LOG.debug("Updating tagPropagation property: [ {} -> {} ] for relationship: [{} --> {}]", oldTagPropagation.name(), - newTagPropagation.name(), getTypeName(relationshipEdge.getOutVertex()), getTypeName(relationshipEdge.getInVertex())); - } + List<AtlasVertex> currentClassificationVertices = getClassificationVertices(edge); + Map<AtlasVertex, List<AtlasVertex>> currentClassificationsMap = getClassificationPropagatedEntitiesMapping(currentClassificationVertices); - if (oldTagPropagation == NONE) { - entityRetriever.addTagPropagation(relationshipEdge, newTagPropagation); - } else if (oldTagPropagation == ONE_TO_TWO) { - if (newTagPropagation == NONE || newTagPropagation == TWO_TO_ONE) { - entityRetriever.removeTagPropagation(relationshipEdge, oldTagPropagation); - } + // Update propagation edge + AtlasGraphUtilsV1.setProperty(edge, Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, newTagPropagation.name()); - if (newTagPropagation != NONE) { - entityRetriever.addTagPropagation(relationshipEdge, newTagPropagation); - } - } else if (oldTagPropagation == TWO_TO_ONE) { - if (newTagPropagation == NONE || newTagPropagation == ONE_TO_TWO) { - entityRetriever.removeTagPropagation(relationshipEdge, oldTagPropagation); - } + List<AtlasVertex> updatedClassificationVertices = getClassificationVertices(edge); + List<AtlasVertex> classificationVerticesUnion = (List<AtlasVertex>) CollectionUtils.union(currentClassificationVertices, updatedClassificationVertices); + Map<AtlasVertex, List<AtlasVertex>> updatedClassificationsMap = getClassificationPropagatedEntitiesMapping(classificationVerticesUnion); - if (newTagPropagation != NONE) { - entityRetriever.addTagPropagation(relationshipEdge, newTagPropagation); - } - } else if (oldTagPropagation == BOTH) { - if (newTagPropagation == ONE_TO_TWO || newTagPropagation == NONE) { - entityRetriever.removeTagPropagation(relationshipEdge, TWO_TO_ONE); - } + // compute add/remove propagations list + Map<AtlasVertex, List<AtlasVertex>> addPropagationsMap = new HashMap<>(); + Map<AtlasVertex, List<AtlasVertex>> removePropagationsMap = new HashMap<>(); + + if (MapUtils.isEmpty(currentClassificationsMap) && MapUtils.isNotEmpty(updatedClassificationsMap)) { + addPropagationsMap.putAll(updatedClassificationsMap); + + } else if (MapUtils.isNotEmpty(currentClassificationsMap) && MapUtils.isEmpty(updatedClassificationsMap)) { + removePropagationsMap.putAll(currentClassificationsMap); - if (newTagPropagation == TWO_TO_ONE || newTagPropagation == NONE) { - entityRetriever.removeTagPropagation(relationshipEdge, ONE_TO_TWO); + } else { + for (AtlasVertex classificationVertex : updatedClassificationsMap.keySet()) { + List<AtlasVertex> currentPropagatingEntities = currentClassificationsMap.get(classificationVertex); + List<AtlasVertex> updatedPropagatingEntities = updatedClassificationsMap.get(classificationVertex); + List<AtlasVertex> entitiesAdded = (List<AtlasVertex>) CollectionUtils.subtract(updatedPropagatingEntities, currentPropagatingEntities); + List<AtlasVertex> entitiesRemoved = (List<AtlasVertex>) CollectionUtils.subtract(currentPropagatingEntities, updatedPropagatingEntities); + + if (CollectionUtils.isNotEmpty(entitiesAdded)) { + addPropagationsMap.put(classificationVertex, entitiesAdded); + } + + if (CollectionUtils.isNotEmpty(entitiesRemoved)) { + removePropagationsMap.put(classificationVertex, entitiesRemoved); + } } } - AtlasGraphUtilsV1.setProperty(relationshipEdge, Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, newTagPropagation.name()); + for (AtlasVertex classificationVertex : addPropagationsMap.keySet()) { + deleteHandler.addTagPropagation(classificationVertex, addPropagationsMap.get(classificationVertex)); + } + + for (AtlasVertex classificationVertex : removePropagationsMap.keySet()) { + deleteHandler.removeTagPropagation(classificationVertex, removePropagationsMap.get(classificationVertex)); + } + } + } + + private Map<AtlasVertex, List<AtlasVertex>> getClassificationPropagatedEntitiesMapping(List<AtlasVertex> classificationVertices) throws AtlasBaseException { + Map<AtlasVertex, List<AtlasVertex>> ret = new HashMap<>(); + + if (CollectionUtils.isNotEmpty(classificationVertices)) { + for (AtlasVertex classificationVertex : classificationVertices) { + String classificationId = classificationVertex.getIdForDisplay(); + String sourceEntityId = getClassificationEntityGuid(classificationVertex); + List<AtlasVertex> entitiesPropagatingTo = graphHelper.getImpactedVerticesWithRestrictions(sourceEntityId, classificationId); + + ret.put(classificationVertex, entitiesPropagatingTo); + } } + + return ret; } private void validateRelationship(AtlasRelationship relationship) throws AtlasBaseException { http://git-wip-us.apache.org/repos/asf/atlas/blob/8063de4a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java index 2b6863a..c00015e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java @@ -33,6 +33,7 @@ import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.type.AtlasArrayType; +import org.apache.atlas.type.AtlasClassificationType; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasMapType; import org.apache.atlas.type.AtlasStructType; @@ -49,6 +50,7 @@ import java.util.*; import static org.apache.atlas.model.instance.AtlasClassification.PropagationState.ACTIVE; import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; +import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL; import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY; @@ -56,6 +58,9 @@ import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX; import static org.apache.atlas.repository.graph.GraphHelper.addToPropagatedTraitNames; import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdgeState; import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdges; +import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid; +import static org.apache.atlas.repository.graph.GraphHelper.getClassificationName; +import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedClassificationEdge; import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEdges; import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames; import static org.apache.atlas.repository.graph.GraphHelper.getTypeName; @@ -353,36 +358,94 @@ public abstract class DeleteHandlerV1 { } } + public List<AtlasVertex> addTagPropagation(AtlasVertex classificationVertex, List<AtlasVertex> propagatedEntityVertices) { + List<AtlasVertex> ret = null; + + if (CollectionUtils.isNotEmpty(propagatedEntityVertices) && classificationVertex != null) { + String classificationName = getTypeName(classificationVertex); + AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classificationName); + + for (AtlasVertex propagatedEntityVertex : propagatedEntityVertices) { + AtlasEdge existingEdge = getPropagatedClassificationEdge(propagatedEntityVertex, classificationVertex); + + if (existingEdge != null) { + continue; + } + + String entityTypeName = getTypeName(propagatedEntityVertex); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName); + + if (classificationType.canApplyToEntityType(entityType)) { + if (LOG.isDebugEnabled()) { + LOG.debug(" --> Adding propagated classification: [{}] to {} ({}) using edge label: [{}]", classificationName, getTypeName(propagatedEntityVertex), + GraphHelper.getGuid(propagatedEntityVertex), CLASSIFICATION_LABEL); + } + + if (ret == null) { + ret = new ArrayList<>(); + } + + ret.add(propagatedEntityVertex); + + graphHelper.addClassificationEdge(propagatedEntityVertex, classificationVertex, true); + + addToPropagatedTraitNames(propagatedEntityVertex, classificationName); + } + } + } + + return ret; + } + public List<AtlasVertex> removeTagPropagation(AtlasVertex classificationVertex) throws AtlasBaseException { List<AtlasVertex> ret = new ArrayList<>(); if (classificationVertex != null) { - String classificationName = getTypeName(classificationVertex); - List<AtlasEdge> propagatedEdges = getPropagatedEdges(classificationVertex); + List<AtlasEdge> propagatedEdges = getPropagatedEdges(classificationVertex); if (CollectionUtils.isNotEmpty(propagatedEdges)) { for (AtlasEdge propagatedEdge : propagatedEdges) { - AtlasVertex propagatedEntityVertex = propagatedEdge.getOutVertex(); + deletePropagatedEdge(propagatedEdge); - if (LOG.isDebugEnabled()) { - LOG.debug("Removing propagated classification: [{}] from: [{}][{}] with edge label: [{}]", classificationName, - getTypeName(propagatedEntityVertex), GraphHelper.getGuid(propagatedEntityVertex), CLASSIFICATION_LABEL); - } + ret.add(propagatedEdge.getOutVertex()); + } + } + } - if (getClassificationEdgeState(propagatedEdge) == ACTIVE) { - removeFromPropagatedTraitNames(propagatedEntityVertex, classificationName); - } + return ret; + } - deleteEdge(propagatedEdge, true); + public void removeTagPropagation(AtlasVertex classificationVertex, List<AtlasVertex> entityVertices) throws AtlasBaseException { + if (classificationVertex != null && CollectionUtils.isNotEmpty(entityVertices)) { + String classificationName = getClassificationName(classificationVertex); + String entityGuid = getClassificationEntityGuid(classificationVertex); - updateModificationMetadata(propagatedEntityVertex); + for (AtlasVertex entityVertex : entityVertices) { + AtlasEdge propagatedEdge = getPropagatedClassificationEdge(entityVertex, classificationName, entityGuid); - ret.add(propagatedEntityVertex); + if (propagatedEdge != null) { + deletePropagatedEdge(propagatedEdge); } } } + } - return ret; + public void deletePropagatedEdge(AtlasEdge edge) throws AtlasBaseException { + String classificationName = AtlasGraphUtilsV1.getProperty(edge, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, String.class); + AtlasVertex entityVertex = edge.getOutVertex(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Removing propagated classification: [{}] from: [{}][{}] with edge label: [{}]", classificationName, + getTypeName(entityVertex), GraphHelper.getGuid(entityVertex), CLASSIFICATION_LABEL); + } + + if (getClassificationEdgeState(edge) == ACTIVE) { + removeFromPropagatedTraitNames(entityVertex, classificationName); + } + + deleteEdge(edge, true); + + updateModificationMetadata(entityVertex); } private void removeFromPropagatedTraitNames(AtlasVertex entityVertex, String classificationName) { http://git-wip-us.apache.org/repos/asf/atlas/blob/8063de4a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java index 2d8b153..564567d 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java @@ -1362,7 +1362,7 @@ public class EntityGraphMapper { LOG.debug("Propagating tag: [{}][{}] to {}", classificationName, entityTypeName, getTypeNames(entitiesToPropagateTo)); } - List<AtlasVertex> entitiesPropagatedTo = addTagPropagation(classificationVertex, entitiesToPropagateTo); + List<AtlasVertex> entitiesPropagatedTo = deleteHandler.addTagPropagation(classificationVertex, entitiesToPropagateTo); if (entitiesPropagatedTo != null) { for (AtlasVertex entityPropagatedTo : entitiesPropagatedTo) { @@ -1427,7 +1427,7 @@ public class EntityGraphMapper { // remove classification from propagated entities if propagation is turned on if (isPropagationEnabled(classificationVertex)) { - List<AtlasVertex> impactedVertices = removeTagPropagation(classificationVertex); + List<AtlasVertex> impactedVertices = deleteHandler.removeTagPropagation(classificationVertex); if (CollectionUtils.isNotEmpty(impactedVertices)) { for (AtlasVertex impactedVertex : impactedVertices) { @@ -1567,7 +1567,7 @@ public class EntityGraphMapper { } } - List<AtlasVertex> entitiesPropagatedTo = addTagPropagation(classificationVertex, entitiesToPropagateTo); + List<AtlasVertex> entitiesPropagatedTo = deleteHandler.addTagPropagation(classificationVertex, entitiesToPropagateTo); if (entitiesPropagatedTo != null) { for (AtlasVertex entityPropagatedTo : entitiesPropagatedTo) { @@ -1576,7 +1576,7 @@ public class EntityGraphMapper { } } } else { - List<AtlasVertex> impactedVertices = removeTagPropagation(classificationVertex); + List<AtlasVertex> impactedVertices = deleteHandler.removeTagPropagation(classificationVertex); if (CollectionUtils.isNotEmpty(impactedVertices)) { if (removedPropagations == null) { @@ -1670,49 +1670,6 @@ public class EntityGraphMapper { } } - private List<AtlasVertex> addTagPropagation(AtlasVertex classificationVertex, List<AtlasVertex> propagatedEntityVertices) { - List<AtlasVertex> ret = null; - - if (CollectionUtils.isNotEmpty(propagatedEntityVertices) && classificationVertex != null) { - String classificationName = getTypeName(classificationVertex); - AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classificationName); - - for (AtlasVertex propagatedEntityVertex : propagatedEntityVertices) { - AtlasEdge existingEdge = getPropagatedClassificationEdge(propagatedEntityVertex, classificationVertex); - - if (existingEdge != null) { - continue; - } - - String entityTypeName = getTypeName(propagatedEntityVertex); - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName); - - if (classificationType.canApplyToEntityType(entityType)) { - if (LOG.isDebugEnabled()) { - LOG.debug(" --> Adding propagated classification: [{}] to {} ({}) using edge label: [{}]", classificationName, getTypeName(propagatedEntityVertex), - GraphHelper.getGuid(propagatedEntityVertex), CLASSIFICATION_LABEL); - } - - if (ret == null) { - ret = new ArrayList<>(); - } - - ret.add(propagatedEntityVertex); - - graphHelper.addClassificationEdge(propagatedEntityVertex, classificationVertex, true); - - addToPropagatedTraitNames(propagatedEntityVertex, classificationName); - } - } - } - - return ret; - } - - private List<AtlasVertex> removeTagPropagation(AtlasVertex classificationVertex) throws AtlasBaseException { - return deleteHandler.removeTagPropagation(classificationVertex); - } - private AtlasEdge mapClassification(EntityOperation operation, final EntityMutationContext context, AtlasClassification classification, AtlasEntityType entityType, AtlasVertex parentInstanceVertex, AtlasVertex traitInstanceVertex) throws AtlasBaseException { http://git-wip-us.apache.org/repos/asf/atlas/blob/8063de4a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java index f8d967e..57c9135 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java @@ -81,7 +81,27 @@ import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID; import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL; import static org.apache.atlas.repository.Constants.CLASSIFICATION_VALIDITY_PERIODS_KEY; import static org.apache.atlas.repository.Constants.TERM_ASSIGNMENT_LABEL; -import static org.apache.atlas.repository.graph.GraphHelper.*; +import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX; +import static org.apache.atlas.repository.graph.GraphHelper.addToPropagatedTraitNames; +import static org.apache.atlas.repository.graph.GraphHelper.getAdjacentEdgesByLabel; +import static org.apache.atlas.repository.graph.GraphHelper.getAllClassificationEdges; +import static org.apache.atlas.repository.graph.GraphHelper.getAllTraitNames; +import static org.apache.atlas.repository.graph.GraphHelper.getAssociatedEntityVertex; +import static org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds; +import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge; +import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdgeState; +import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertices; +import static org.apache.atlas.repository.graph.GraphHelper.getGuid; +import static org.apache.atlas.repository.graph.GraphHelper.getIncomingEdgesByLabel; +import static org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel; +import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags; +import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedClassificationEdge; +import static org.apache.atlas.repository.graph.GraphHelper.getPropagationEnabledClassificationVertices; +import static org.apache.atlas.repository.graph.GraphHelper.getRelationshipGuid; +import static org.apache.atlas.repository.graph.GraphHelper.getTypeName; +import static org.apache.atlas.repository.graph.GraphHelper.isPropagatedClassificationEdge; +import static org.apache.atlas.repository.graph.GraphHelper.isPropagationEnabled; +import static org.apache.atlas.repository.graph.GraphHelper.removeFromPropagatedTraitNames; import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getIdFromVertex; import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection; import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH; @@ -455,29 +475,6 @@ public final class EntityGraphRetriever { } } - public List<AtlasVertex> getPropagationEnabledClassificationVertices(AtlasVertex entityVertex) { - List<AtlasVertex> ret = new ArrayList<>(); - Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL).edges(); - - if (edges != null) { - Iterator<AtlasEdge> iterator = edges.iterator(); - - while (iterator.hasNext()) { - AtlasEdge edge = iterator.next(); - - if (edge != null) { - AtlasVertex classificationVertex = edge.getInVertex(); - - if (isPropagationEnabled(classificationVertex)) { - ret.add(classificationVertex); - } - } - } - } - - return ret; - } - public List<AtlasClassification> getAllClassifications(AtlasVertex entityVertex) throws AtlasBaseException { List<AtlasClassification> ret = new ArrayList<>(); Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL).edges(); @@ -1033,24 +1030,6 @@ public final class EntityGraphRetriever { relationship.setBlockedPropagatedClassifications(blockedClassifications); } - public List<AtlasVertex> getClassificationVertices(AtlasEdge edge) { - List<AtlasVertex> ret = new ArrayList<>(); - - if (edge != null) { - PropagateTags propagateTags = getPropagateTags(edge); - - if (propagateTags == PropagateTags.ONE_TO_TWO || propagateTags == PropagateTags.BOTH) { - ret.addAll(getPropagationEnabledClassificationVertices(edge.getOutVertex())); - } - - if (propagateTags == PropagateTags.TWO_TO_ONE || propagateTags == PropagateTags.BOTH) { - ret.addAll(getPropagationEnabledClassificationVertices(edge.getInVertex())); - } - } - - return ret; - } - private void mapAttributes(AtlasEdge edge, AtlasRelationship relationship) throws AtlasBaseException { AtlasType objType = typeRegistry.getType(relationship.getTypeName()); http://git-wip-us.apache.org/repos/asf/atlas/blob/8063de4a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java index 8828a87..72b7261 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java @@ -66,6 +66,12 @@ public class AtlasGremlin3QueryProvider extends AtlasGremlin2QueryProvider { "inE().has('__state', 'ACTIVE').has('tagPropagation', within('TWO_TO_ONE', 'BOTH')).outV())" + ".dedup().where(without('src')).simplePath()).emit().toList();"; + case TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS: + return "g.V().has('__guid', guid).aggregate('src')" + + ".repeat(union(outE().has('__state', 'ACTIVE').has('tagPropagation', within('ONE_TO_TWO', 'BOTH')).not(has('blockedPropagatedClassifications', org.janusgraph.core.attribute.Text.textContains(classificationId))).inV(), " + + "inE().has('__state', 'ACTIVE').has('tagPropagation', within('TWO_TO_ONE', 'BOTH')).not(has('blockedPropagatedClassifications', org.janusgraph.core.attribute.Text.textContains(classificationId))).outV())" + + ".dedup().where(without('src')).simplePath()).emit().toList();"; + case TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL: return "g.V().has('__guid', guid).aggregate('src')" + ".repeat(union(outE().has('__state', 'ACTIVE').has('tagPropagation', within('ONE_TO_TWO', 'BOTH')).has('_r__guid', neq(relationshipGuid)).inV(), " + http://git-wip-us.apache.org/repos/asf/atlas/blob/8063de4a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java index 3e3ee11..cca80b5 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java @@ -84,6 +84,7 @@ public abstract class AtlasGremlinQueryProvider { COMPARE_NOT_NULL, TAG_PROPAGATION_IMPACTED_INSTANCES, - TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL + TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL, + TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS } }