Repository: atlas Updated Branches: refs/heads/master bdf16a5f8 -> 967bf67eb
ATLAS-2643: Re-evaluate tag propagation when a relationship edge is deleted Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/967bf67e Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/967bf67e Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/967bf67e Branch: refs/heads/master Commit: 967bf67eb6858c91c97b13e2b1c60caa1d51299c Parents: bdf16a5 Author: Sarath Subramanian <[email protected]> Authored: Fri May 4 01:03:44 2018 -0700 Committer: Sarath Subramanian <[email protected]> Committed: Fri May 4 01:03:44 2018 -0700 ---------------------------------------------------------------------- .../atlas/repository/graph/GraphHelper.java | 32 ++++++++++++- .../graph/v1/AtlasRelationshipStoreV1.java | 29 +----------- .../store/graph/v1/DeleteHandlerV1.java | 48 +++++++++++++++++++- .../store/graph/v1/HardDeleteHandlerV1.java | 3 ++ .../store/graph/v1/SoftDeleteHandlerV1.java | 3 ++ .../atlas/util/AtlasGremlin3QueryProvider.java | 12 ++++- .../atlas/util/AtlasGremlinQueryProvider.java | 3 +- 7 files changed, 97 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/967bf67e/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index 86a6b81..edf10da 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -86,6 +86,7 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN; import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT; import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES; +import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES_EXCLUDE_RELATIONSHIP; import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL; import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS; @@ -879,15 +880,24 @@ public final class GraphHelper { } public List<AtlasVertex> getImpactedVerticesWithRestrictions(String guid, String classificationId) throws AtlasBaseException { + return getImpactedVerticesWithRestrictions(guid, classificationId, null); + } + + public List<AtlasVertex> getImpactedVerticesWithRestrictions(String guid, String classificationId, String guidRelationshipToExclude) throws AtlasBaseException { ScriptEngine scriptEngine = graph.getGremlinScriptEngine(); Bindings bindings = scriptEngine.createBindings(); - String query = queryProvider.getQuery(TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS); List<AtlasVertex> ret = new ArrayList<>(); + String query = queryProvider.getQuery(TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS); bindings.put("g", graph); bindings.put("guid", guid); bindings.put("classificationId", classificationId); + if (guidRelationshipToExclude != null) { + query = queryProvider.getQuery(TAG_PROPAGATION_IMPACTED_INSTANCES_EXCLUDE_RELATIONSHIP); + bindings.put("guidRelationshipToExclude", guidRelationshipToExclude); + } + try { Object resultObj = graph.executeGremlinScript(scriptEngine, bindings, query, false); @@ -1013,6 +1023,26 @@ public final class GraphHelper { return ret; } + public Map<AtlasVertex, List<AtlasVertex>> getClassificationPropagatedEntitiesMapping(List<AtlasVertex> classificationVertices) throws AtlasBaseException { + return getClassificationPropagatedEntitiesMapping(classificationVertices, null); + } + + public Map<AtlasVertex, List<AtlasVertex>> getClassificationPropagatedEntitiesMapping(List<AtlasVertex> classificationVertices, String guidRelationshipToExclude) throws AtlasBaseException { + Map<AtlasVertex, List<AtlasVertex>> ret = new HashMap<>(); + + if (CollectionUtils.isNotEmpty(classificationVertices)) { + for (AtlasVertex classificationVertex : classificationVertices) { + String classificationId = classificationVertex.getIdForDisplay(); + String sourceEntityId = getClassificationEntityGuid(classificationVertex); + List<AtlasVertex> entitiesPropagatingTo = getImpactedVerticesWithRestrictions(sourceEntityId, classificationId, guidRelationshipToExclude); + + ret.put(classificationVertex, entitiesPropagatingTo); + } + } + + return ret; + } + public static List<AtlasVertex> getPropagationEnabledClassificationVertices(AtlasVertex entityVertex) { List<AtlasVertex> ret = new ArrayList<>(); Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL).edges(); http://git-wip-us.apache.org/repos/asf/atlas/blob/967bf67e/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java index cf6f72f..9fcba6d 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java @@ -252,17 +252,8 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore { throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_ALREADY_DELETED, guid); } - // remove tag propagations - List<AtlasVertex> propagatedClassificationVertices = getClassificationVertices(edge); - deleteHandler.deleteRelationships(Collections.singleton(edge)); - for (AtlasVertex classificationVertex : propagatedClassificationVertices) { - List<AtlasVertex> removePropagationFromVertices = graphHelper.getPropagatedEntityVertices(classificationVertex); - - deleteHandler.removeTagPropagation(classificationVertex, removePropagationFromVertices); - } - // notify entities for added/removed classification propagation entityChangeNotifier.notifyPropagatedEntities(); @@ -460,14 +451,14 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore { if (newTagPropagation != oldTagPropagation) { List<AtlasVertex> currentClassificationVertices = getClassificationVertices(edge); - Map<AtlasVertex, List<AtlasVertex>> currentClassificationsMap = getClassificationPropagatedEntitiesMapping(currentClassificationVertices); + Map<AtlasVertex, List<AtlasVertex>> currentClassificationsMap = graphHelper.getClassificationPropagatedEntitiesMapping(currentClassificationVertices); // Update propagation edge AtlasGraphUtilsV1.setProperty(edge, Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, newTagPropagation.name()); List<AtlasVertex> updatedClassificationVertices = getClassificationVertices(edge); List<AtlasVertex> classificationVerticesUnion = (List<AtlasVertex>) CollectionUtils.union(currentClassificationVertices, updatedClassificationVertices); - Map<AtlasVertex, List<AtlasVertex>> updatedClassificationsMap = getClassificationPropagatedEntitiesMapping(classificationVerticesUnion); + Map<AtlasVertex, List<AtlasVertex>> updatedClassificationsMap = graphHelper.getClassificationPropagatedEntitiesMapping(classificationVerticesUnion); // compute add/remove propagations list Map<AtlasVertex, List<AtlasVertex>> addPropagationsMap = new HashMap<>(); @@ -510,22 +501,6 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore { } } - private Map<AtlasVertex, List<AtlasVertex>> getClassificationPropagatedEntitiesMapping(List<AtlasVertex> classificationVertices) throws AtlasBaseException { - Map<AtlasVertex, List<AtlasVertex>> ret = new HashMap<>(); - - if (CollectionUtils.isNotEmpty(classificationVertices)) { - for (AtlasVertex classificationVertex : classificationVertices) { - String classificationId = classificationVertex.getIdForDisplay(); - String sourceEntityId = getClassificationEntityGuid(classificationVertex); - List<AtlasVertex> entitiesPropagatingTo = graphHelper.getImpactedVerticesWithRestrictions(sourceEntityId, classificationId); - - ret.put(classificationVertex, entitiesPropagatingTo); - } - } - - return ret; - } - private void validateRelationship(AtlasRelationship relationship) throws AtlasBaseException { if (relationship == null) { throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "AtlasRelationship is null"); http://git-wip-us.apache.org/repos/asf/atlas/blob/967bf67e/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java index d79d914..39469b9 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java @@ -44,6 +44,7 @@ import org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdg import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +57,7 @@ import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags. import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL; import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY; import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX; import static org.apache.atlas.repository.graph.GraphHelper.addToPropagatedTraitNames; @@ -63,9 +65,9 @@ import static org.apache.atlas.repository.graph.GraphHelper.getAllClassification import static org.apache.atlas.repository.graph.GraphHelper.getAssociatedEntityVertex; import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge; import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdgeState; -import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdges; import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid; import static org.apache.atlas.repository.graph.GraphHelper.getClassificationName; +import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertices; import static org.apache.atlas.repository.graph.GraphHelper.getGuid; import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedClassificationEdge; import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEdges; @@ -74,7 +76,6 @@ import static org.apache.atlas.repository.graph.GraphHelper.getRelationshipGuid; import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames; import static org.apache.atlas.repository.graph.GraphHelper.getTypeName; import static org.apache.atlas.repository.graph.GraphHelper.isPropagatedClassificationEdge; -import static org.apache.atlas.repository.graph.GraphHelper.isRelationshipEdge; import static org.apache.atlas.repository.graph.GraphHelper.string; import static org.apache.atlas.repository.graph.GraphHelper.updateModificationMetadata; import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getIdFromEdge; @@ -438,6 +439,49 @@ public abstract class DeleteHandlerV1 { return ret; } + public void removeTagPropagation(AtlasEdge edge) throws AtlasBaseException { + if (edge == null || !isRelationshipEdge(edge)) { + return; + } + + List<AtlasVertex> currentClassificationVertices = getClassificationVertices(edge); + Map<AtlasVertex, List<AtlasVertex>> currentClassificationsMap = graphHelper.getClassificationPropagatedEntitiesMapping(currentClassificationVertices); + Map<AtlasVertex, List<AtlasVertex>> updatedClassificationsMap = graphHelper.getClassificationPropagatedEntitiesMapping(currentClassificationVertices, getRelationshipGuid(edge)); + Map<AtlasVertex, List<AtlasVertex>> removePropagationsMap = new HashMap<>(); + + if (MapUtils.isNotEmpty(currentClassificationsMap) && MapUtils.isEmpty(updatedClassificationsMap)) { + removePropagationsMap.putAll(currentClassificationsMap); + } else { + for (AtlasVertex classificationVertex : updatedClassificationsMap.keySet()) { + List<AtlasVertex> currentPropagatingEntities = currentClassificationsMap.containsKey(classificationVertex) ? currentClassificationsMap.get(classificationVertex) : Collections.emptyList(); + List<AtlasVertex> updatedPropagatingEntities = updatedClassificationsMap.containsKey(classificationVertex) ? updatedClassificationsMap.get(classificationVertex) : Collections.emptyList(); + List<AtlasVertex> entitiesRemoved = (List<AtlasVertex>) CollectionUtils.subtract(currentPropagatingEntities, updatedPropagatingEntities); + + if (CollectionUtils.isNotEmpty(entitiesRemoved)) { + removePropagationsMap.put(classificationVertex, entitiesRemoved); + } + } + } + + for (AtlasVertex classificationVertex : removePropagationsMap.keySet()) { + removeTagPropagation(classificationVertex, removePropagationsMap.get(classificationVertex)); + } + } + + public boolean isRelationshipEdge(AtlasEdge edge) { + boolean ret = false; + + if (edge != null) { + String outVertexType = getTypeName(edge.getOutVertex()); + String inVertexType = getTypeName(edge.getInVertex()); + + ret = GraphHelper.isRelationshipEdge(edge) || edge.getPropertyKeys().contains(RELATIONSHIP_GUID_PROPERTY_KEY) || + (typeRegistry.getEntityTypeByName(outVertexType) != null && typeRegistry.getEntityTypeByName(inVertexType) != null); + } + + return ret; + } + public List<AtlasVertex> removeTagPropagation(AtlasVertex classificationVertex) throws AtlasBaseException { List<AtlasVertex> ret = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/atlas/blob/967bf67e/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/HardDeleteHandlerV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/HardDeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/HardDeleteHandlerV1.java index edf1eed..a95e689 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/HardDeleteHandlerV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/HardDeleteHandlerV1.java @@ -52,6 +52,9 @@ public class HardDeleteHandlerV1 extends DeleteHandlerV1 { LOG.debug("==> HardDeleteHandlerV1.deleteEdge({}, {})", GraphHelper.string(edge), force); } + // re-evaluate tag propagation + removeTagPropagation(edge); + graphHelper.removeEdge(edge); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/967bf67e/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java index 83c6e07..2c921fc 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java @@ -68,6 +68,9 @@ public class SoftDeleteHandlerV1 extends DeleteHandlerV1 { LOG.debug("==> SoftDeleteHandlerV1.deleteEdge({}, {})",GraphHelper.string(edge), force); } + // re-evaluate tag propagation + removeTagPropagation(edge); + if (force) { graphHelper.removeEdge(edge); } else { http://git-wip-us.apache.org/repos/asf/atlas/blob/967bf67e/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java index 8555f70..9585a57 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java @@ -70,15 +70,23 @@ public class AtlasGremlin3QueryProvider extends AtlasGremlin2QueryProvider { case TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS: return "g.V().has('__guid', guid).aggregate('src')" + - ".repeat(union(outE().has('__state', 'ACTIVE').has('tagPropagation', within('ONE_TO_TWO', 'BOTH')).not(has('blockedPropagatedClassifications', org.janusgraph.core.attribute.Text.textContains(classificationId))).inV(), " + + ".repeat(union(outE().has('__state', 'ACTIVE').has('tagPropagation', within('ONE_TO_TWO', 'BOTH')).not(has('blockedPropagatedClassifications', org.janusgraph.core.attribute.Text.textContains(classificationId))).inV(), " + "inE().has('__state', 'ACTIVE').has('tagPropagation', within('TWO_TO_ONE', 'BOTH')).not(has('blockedPropagatedClassifications', org.janusgraph.core.attribute.Text.textContains(classificationId))).outV())" + - ".dedup().where(without('src')).simplePath()).emit().toList();"; + ".dedup().where(without('src')).simplePath()).emit().toList();"; case TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL: return "g.V().has('__guid', guid).aggregate('src')" + ".repeat(union(outE().has('__state', 'ACTIVE').has('tagPropagation', within('ONE_TO_TWO', 'BOTH')).has('_r__guid', neq(relationshipGuid)).inV(), " + "inE().has('__state', 'ACTIVE').has('tagPropagation', within('TWO_TO_ONE', 'BOTH')).has('_r__guid', neq(relationshipGuid)).outV())" + ".dedup().where(without('src')).simplePath()).emit().toList();"; + + case TAG_PROPAGATION_IMPACTED_INSTANCES_EXCLUDE_RELATIONSHIP: + return "g.V().has('__guid', guid).aggregate('src')" + + ".repeat(union(outE().has('__state', 'ACTIVE').has('tagPropagation', within('ONE_TO_TWO', 'BOTH')).has('_r__guid', neq(guidRelationshipToExclude))" + + ".not(has('blockedPropagatedClassifications', org.janusgraph.core.attribute.Text.textContains(classificationId))).inV(), " + + "inE().has('__state', 'ACTIVE').has('tagPropagation', within('TWO_TO_ONE', 'BOTH')).has('_r__guid', neq(guidRelationshipToExclude))" + + ".not(has('blockedPropagatedClassifications', org.janusgraph.core.attribute.Text.textContains(classificationId))).outV())" + + ".dedup().where(without('src')).simplePath()).emit().toList();"; } return super.getQuery(gremlinQuery); } http://git-wip-us.apache.org/repos/asf/atlas/blob/967bf67e/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java index e2992f1..8555b4c 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java @@ -86,6 +86,7 @@ public abstract class AtlasGremlinQueryProvider { TAG_PROPAGATION_IMPACTED_INSTANCES, TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL, - TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS + TAG_PROPAGATION_IMPACTED_INSTANCES_WITH_RESTRICTIONS, + TAG_PROPAGATION_IMPACTED_INSTANCES_EXCLUDE_RELATIONSHIP } }
