Repository: atlas Updated Branches: refs/heads/master 9c58d30c7 -> a3374c747
http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java index b05a9a3..d01fb9f 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java @@ -30,15 +30,18 @@ import org.apache.atlas.model.instance.AtlasRelatedObjectId; import org.apache.atlas.model.instance.AtlasRelationship; import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.model.typedef.AtlasRelationshipDef; +import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags; import org.apache.atlas.model.typedef.AtlasRelationshipEndDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.RepositoryException; import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; import org.apache.atlas.repository.graphdb.AtlasElement; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.type.AtlasArrayType; +import org.apache.atlas.type.AtlasClassificationType; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasMapType; import org.apache.atlas.type.AtlasRelationshipType; @@ -58,13 +61,32 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.*; +import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID; +import static org.apache.atlas.repository.Constants.CLASSIFICATION_PROPAGATE_KEY; +import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY; import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX; +import static org.apache.atlas.repository.graph.GraphHelper.edgeExists; +import static org.apache.atlas.repository.graph.GraphHelper.getAdjacentEdgesByLabel; +import static org.apache.atlas.repository.graph.GraphHelper.getAllTraitNames; +import static org.apache.atlas.repository.graph.GraphHelper.getAssociatedEntityVertex; +import static org.apache.atlas.repository.graph.GraphHelper.getGuid; +import static org.apache.atlas.repository.graph.GraphHelper.getIncomingEdgesByLabel; +import static org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel; +import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags; +import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEdgeLabel; +import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedTraitNames; +import static org.apache.atlas.repository.graph.GraphHelper.getRelationshipGuid; +import static org.apache.atlas.repository.graph.GraphHelper.getTraitLabel; +import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames; +import static org.apache.atlas.repository.graph.GraphHelper.getTypeName; +import static org.apache.atlas.repository.graph.GraphHelper.removePropagatedTraitNameFromVertex; import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getIdFromVertex; import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection; import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH; @@ -191,6 +213,17 @@ public final class EntityGraphRetriever { return ret; } + public AtlasClassification toAtlasClassification(AtlasVertex classificationVertex) throws AtlasBaseException { + AtlasClassification ret = new AtlasClassification(getTypeName(classificationVertex)); + + ret.setEntityGuid(AtlasGraphUtilsV1.getProperty(classificationVertex, CLASSIFICATION_ENTITY_GUID, String.class)); + ret.setPropagate(AtlasGraphUtilsV1.getProperty(classificationVertex, CLASSIFICATION_PROPAGATE_KEY, Boolean.class)); + + mapAttributes(classificationVertex, ret, null); + + return ret; + } + public AtlasVertex getReferencedEntityVertex(AtlasEdge edge, AtlasRelationshipEdgeDirection relationshipDirection, AtlasVertex parentVertex) throws AtlasBaseException { AtlasVertex entityVertex = null; @@ -245,7 +278,7 @@ public final class EntityGraphRetriever { } private AtlasEntity mapVertexToAtlasEntity(AtlasVertex entityVertex, AtlasEntityExtInfo entityExtInfo) throws AtlasBaseException { - String guid = GraphHelper.getGuid(entityVertex); + String guid = getGuid(entityVertex); AtlasEntity entity = entityExtInfo != null ? entityExtInfo.getEntity(guid) : null; if (entity == null) { @@ -265,7 +298,7 @@ public final class EntityGraphRetriever { mapRelationshipAttributes(entityVertex, entity); - mapClassifications(entityVertex, entity, entityExtInfo); + mapClassifications(entityVertex, entity); } return entity; @@ -284,7 +317,7 @@ public final class EntityGraphRetriever { ret.setTypeName(typeName); ret.setGuid(guid); ret.setStatus(GraphHelper.getStatus(entityVertex)); - ret.setClassificationNames(GraphHelper.getTraitNames(entityVertex)); + ret.setClassificationNames(getAllTraitNames(entityVertex)); AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); @@ -347,8 +380,8 @@ public final class EntityGraphRetriever { LOG.debug("Mapping system attributes for type {}", entity.getTypeName()); } - entity.setGuid(GraphHelper.getGuid(entityVertex)); - entity.setTypeName(GraphHelper.getTypeName(entityVertex)); + entity.setGuid(getGuid(entityVertex)); + entity.setTypeName(getTypeName(entityVertex)); entity.setStatus(GraphHelper.getStatus(entityVertex)); entity.setVersion(GraphHelper.getVersion(entityVertex)); @@ -377,28 +410,52 @@ public final class EntityGraphRetriever { } } - public List<AtlasClassification> getClassifications(String guid) throws AtlasBaseException { + public boolean isPropagationEnabled(AtlasVertex classificationVertex) { + boolean ret = false; + if (classificationVertex != null) { + Boolean enabled = AtlasGraphUtilsV1.getProperty(classificationVertex, CLASSIFICATION_PROPAGATE_KEY, Boolean.class); + + ret = enabled == null ? true : enabled; + } + + return ret; + } + + public List<AtlasClassification> getClassifications(String guid) throws AtlasBaseException { AtlasVertex instanceVertex = AtlasGraphUtilsV1.findByGuid(guid); + if (instanceVertex == null) { throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); } - return getClassifications(instanceVertex, null); + return getClassifications(instanceVertex); } public List<AtlasClassification> getClassifications(AtlasVertex instanceVertex) throws AtlasBaseException { - return getClassifications(instanceVertex, null); + final List<AtlasClassification> classifications = getClassifications(instanceVertex, null); + final List<AtlasClassification> propagatedClassifications = getPropagatedClassifications(instanceVertex, null); + + classifications.addAll(propagatedClassifications); + + return classifications; } public AtlasClassification getClassification(String guid, String classificationName) throws AtlasBaseException { - AtlasVertex instanceVertex = AtlasGraphUtilsV1.findByGuid(guid); + if (instanceVertex == null) { throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); } - List<AtlasClassification> classifications = getClassifications(instanceVertex, classificationName); + List<AtlasClassification> classifications = null; + + try { + classifications = getClassifications(instanceVertex, classificationName); + } catch (AtlasBaseException excp) { + // ignore and look for propagated classifications + classifications = getPropagatedClassifications(instanceVertex, classificationName); + } if(CollectionUtils.isEmpty(classifications)) { throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName); @@ -409,54 +466,141 @@ public final class EntityGraphRetriever { private List<AtlasClassification> getClassifications(AtlasVertex instanceVertex, String classificationNameFilter) throws AtlasBaseException { - List<AtlasClassification> classifications = new ArrayList<>(); - List<String> classificationNames = GraphHelper.getTraitNames(instanceVertex); + List<AtlasClassification> ret = new ArrayList<>(); + List<String> classificationNames = getTraitNames(instanceVertex); if (CollectionUtils.isNotEmpty(classificationNames)) { - for (String classificationName : classificationNames) { - AtlasClassification classification; - if (StringUtils.isNotEmpty(classificationNameFilter)) { - if (classificationName.equals(classificationNameFilter)) { - classification = getClassification(instanceVertex, classificationName); - classifications.add(classification); - return classifications; - } - } else { - classification = getClassification(instanceVertex, classificationName); - classifications.add(classification); + if (StringUtils.isNotEmpty(classificationNameFilter)) { + if (classificationNames.contains(classificationNameFilter)) { + ret.add(getClassification(instanceVertex, classificationNameFilter)); + } + } else { + for (String classificationName : classificationNames) { + ret.add(getClassification(instanceVertex, classificationName)); } } + } + + if (ret.isEmpty() && StringUtils.isNotEmpty(classificationNameFilter)) { + throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationNameFilter); + } + + return ret; + } + private List<AtlasClassification> getPropagatedClassifications(AtlasVertex instanceVertex, String classificationNameFilter) throws AtlasBaseException { + List<AtlasClassification> ret = new ArrayList<>(); + List<String> classificationNames = getPropagatedTraitNames(instanceVertex); + if (CollectionUtils.isNotEmpty(classificationNames)) { if (StringUtils.isNotEmpty(classificationNameFilter)) { - //Should not reach here if classification present - throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationNameFilter); + if (classificationNames.contains(classificationNameFilter)) { + ret.addAll(getAllPropagatedClassifications(instanceVertex, classificationNameFilter)); + } + } else { + for (String classificationName : new HashSet<>(classificationNames)) { + ret.addAll(getAllPropagatedClassifications(instanceVertex, classificationName)); + } } } - return classifications; + + if (ret.isEmpty() && StringUtils.isNotEmpty(classificationNameFilter)) { + throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationNameFilter); + } + + return ret; } - private AtlasClassification getClassification(AtlasVertex instanceVertex, String classificationName) throws AtlasBaseException { + private List<AtlasClassification> getAllPropagatedClassifications(AtlasVertex vertex, String classificationName) throws AtlasBaseException { + List<AtlasClassification> ret = new ArrayList<>(); + String edgeLabel = getPropagatedEdgeLabel(classificationName); + Iterable<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.OUT, edgeLabel); - AtlasClassification ret = null; - if (LOG.isDebugEnabled()) { - LOG.debug("mapping classification {} to atlas entity", classificationName); + if (edges != null) { + for (Iterator<AtlasEdge> iterator = edges.iterator(); iterator.hasNext(); ) { + AtlasEdge edge = iterator.next(); + + if (edge != null) { + AtlasClassification classification = toAtlasClassification(edge.getInVertex()); + + ret.add(classification); + } + } + } + + return ret; + } + + protected List<AtlasVertex> getPropagationEnabledClassificationVertices(AtlasVertex instanceVertex) { + List<AtlasVertex> ret = new ArrayList<>(); + List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(instanceVertex, false); + List<AtlasVertex> propagatedClassificationVertices = getPropagationEnabledClassificationVertices(instanceVertex, true); + + if (CollectionUtils.isNotEmpty(classificationVertices)) { + ret.addAll(classificationVertices); + } + + if (CollectionUtils.isNotEmpty(propagatedClassificationVertices)) { + ret.addAll(propagatedClassificationVertices); + } + + return ret; + } + + private List<AtlasVertex> getPropagationEnabledClassificationVertices(AtlasVertex vertex, boolean propagated) { + List<AtlasVertex> ret = new ArrayList<>(); + List<String> classificationNames = (propagated) ? getPropagatedTraitNames(vertex) : getTraitNames(vertex); + + if (CollectionUtils.isNotEmpty(classificationNames)) { + for (String classificationName : classificationNames) { + String traitLabel = (propagated) ? getPropagatedEdgeLabel(classificationName) : getTraitLabel(classificationName); + Iterable<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.OUT, traitLabel); + + if (edges != null) { + for (Iterator<AtlasEdge> iterator = edges.iterator(); iterator.hasNext(); ) { + AtlasEdge edge = iterator.next(); + + if (edge != null) { + AtlasVertex classificationVertex = edge.getInVertex(); + + if (isPropagationEnabled(classificationVertex)) { + ret.add(classificationVertex); + } + } + } + } + } } - Iterable<AtlasEdge> edges = instanceVertex.getEdges(AtlasEdgeDirection.OUT, classificationName); - AtlasEdge edge = (edges != null && edges.iterator().hasNext()) ? edges.iterator().next() : null; + return ret; + } - if (edge != null) { - ret = new AtlasClassification(classificationName); - mapAttributes(edge.getInVertex(), ret, null); + public AtlasClassification getClassification(AtlasVertex instanceVertex, String classificationName) throws AtlasBaseException { + AtlasClassification ret = getClassification(instanceVertex, classificationName, false); + + // if no classification with the given name was directly associated, look for a propagated classification + if (ret == null) { + ret = getClassification(instanceVertex, classificationName, true); } return ret; } - private void mapClassifications(AtlasVertex entityVertex, AtlasEntity entity, AtlasEntityExtInfo entityExtInfo) throws AtlasBaseException { - final List<AtlasClassification> classifications = getClassifications(entityVertex, null); + private AtlasClassification getClassification(AtlasVertex instanceVertex, String classificationName, boolean propagated) throws AtlasBaseException { + String traitLabel = (propagated) ? getPropagatedEdgeLabel(classificationName) : getTraitLabel(classificationName); + Iterable<AtlasEdge> edges = instanceVertex.getEdges(AtlasEdgeDirection.OUT, traitLabel); + AtlasEdge edge = (edges != null && edges.iterator().hasNext()) ? edges.iterator().next() : null; + AtlasClassification ret = edge != null ? toAtlasClassification(edge.getInVertex()) : null; + + return ret; + } + + private void mapClassifications(AtlasVertex entityVertex, AtlasEntity entity) throws AtlasBaseException { + final List<AtlasClassification> classifications = getClassifications(entityVertex, null); + final List<AtlasClassification> propagatedClassifications = getPropagatedClassifications(entityVertex, null); + entity.setClassifications(classifications); + entity.addClassifications(propagatedClassifications); } private Object mapVertexToAttribute(AtlasVertex entityVertex, AtlasAttribute attribute, AtlasEntityExtInfo entityExtInfo) throws AtlasBaseException { @@ -664,7 +808,7 @@ public final class EntityGraphRetriever { ret = AtlasTypeUtil.getAtlasObjectId(entity); } } else { - ret = new AtlasObjectId(GraphHelper.getGuid(referenceVertex), GraphHelper.getTypeName(referenceVertex)); + ret = new AtlasObjectId(getGuid(referenceVertex), getTypeName(referenceVertex)); } } } @@ -681,7 +825,7 @@ public final class EntityGraphRetriever { if (GraphHelper.elementExists(edge)) { final AtlasVertex referenceVertex = edge.getInVertex(); - ret = new AtlasStruct(GraphHelper.getTypeName(referenceVertex)); + ret = new AtlasStruct(getTypeName(referenceVertex)); mapAttributes(referenceVertex, ret, entityExtInfo); } @@ -756,11 +900,11 @@ public final class EntityGraphRetriever { Iterator<AtlasEdge> edges = null; if (attribute.getRelationshipEdgeDirection() == IN) { - edges = graphHelper.getIncomingEdgesByLabel(entityVertex, attribute.getRelationshipEdgeLabel()); + edges = getIncomingEdgesByLabel(entityVertex, attribute.getRelationshipEdgeLabel()); } else if (attribute.getRelationshipEdgeDirection() == OUT) { - edges = graphHelper.getOutGoingEdgesByLabel(entityVertex, attribute.getRelationshipEdgeLabel()); + edges = getOutGoingEdgesByLabel(entityVertex, attribute.getRelationshipEdgeLabel()); } else if (attribute.getRelationshipEdgeDirection() == BOTH) { - edges = graphHelper.getAdjacentEdgesByLabel(entityVertex, AtlasEdgeDirection.BOTH, attribute.getRelationshipEdgeLabel()); + edges = getAdjacentEdgesByLabel(entityVertex, AtlasEdgeDirection.BOTH, attribute.getRelationshipEdgeLabel()); } if (edges != null) { @@ -787,8 +931,8 @@ public final class EntityGraphRetriever { } if (referenceVertex != null) { - String entityTypeName = GraphHelper.getTypeName(referenceVertex); - String entityGuid = GraphHelper.getGuid(referenceVertex); + String entityTypeName = getTypeName(referenceVertex); + String entityGuid = getGuid(referenceVertex); AtlasRelationship relationship = mapEdgeToAtlasRelationship(edge); ret = new AtlasRelatedObjectId(entityGuid, entityTypeName, relationship.getGuid(), @@ -835,8 +979,8 @@ public final class EntityGraphRetriever { LOG.debug("Mapping system attributes for relationship"); } - relationship.setGuid(GraphHelper.getGuid(edge)); - relationship.setTypeName(GraphHelper.getTypeName(edge)); + relationship.setGuid(getRelationshipGuid(edge)); + relationship.setTypeName(getTypeName(edge)); relationship.setCreatedBy(GraphHelper.getCreatedByAsString(edge)); relationship.setUpdatedBy(GraphHelper.getModifiedByAsString(edge)); @@ -856,11 +1000,11 @@ public final class EntityGraphRetriever { AtlasVertex end1Vertex = edge.getOutVertex(); AtlasVertex end2Vertex = edge.getInVertex(); - relationship.setEnd1(new AtlasObjectId(GraphHelper.getGuid(end1Vertex), GraphHelper.getTypeName(end1Vertex))); - relationship.setEnd2(new AtlasObjectId(GraphHelper.getGuid(end2Vertex), GraphHelper.getTypeName(end2Vertex))); + relationship.setEnd1(new AtlasObjectId(getGuid(end1Vertex), getTypeName(end1Vertex))); + relationship.setEnd2(new AtlasObjectId(getGuid(end2Vertex), getTypeName(end2Vertex))); relationship.setLabel(edge.getLabel()); - relationship.setPropagateTags(GraphHelper.getPropagateTags(edge)); + relationship.setPropagateTags(getPropagateTags(edge)); return relationship; } @@ -881,4 +1025,152 @@ public final class EntityGraphRetriever { relationship.setAttribute(attribute.getName(), attrValue); } } -} + + public void addTagPropagation(AtlasEdge edge, PropagateTags propagateTags) throws AtlasBaseException { + if (edge == null) { + return; + } + + AtlasVertex outVertex = edge.getOutVertex(); + AtlasVertex inVertex = edge.getInVertex(); + + if (propagateTags == PropagateTags.ONE_TO_TWO || propagateTags == PropagateTags.BOTH) { + addTagPropagation(outVertex, inVertex, edge); + } + + if (propagateTags == PropagateTags.TWO_TO_ONE || propagateTags == PropagateTags.BOTH) { + addTagPropagation(inVertex, outVertex, edge); + } + } + + public void removeTagPropagation(AtlasEdge edge, PropagateTags propagateTags) throws AtlasBaseException { + if (edge == null) { + return; + } + + AtlasVertex outVertex = edge.getOutVertex(); + AtlasVertex inVertex = edge.getInVertex(); + + if (propagateTags == PropagateTags.ONE_TO_TWO || propagateTags == PropagateTags.BOTH) { + removeTagPropagation(outVertex, inVertex, edge); + } + + if (propagateTags == PropagateTags.TWO_TO_ONE || propagateTags == PropagateTags.BOTH) { + removeTagPropagation(inVertex, outVertex, edge); + } + } + + private void addTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasEdge edge) throws AtlasBaseException { + final List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(fromVertex); + final List<AtlasVertex> impactedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? graphHelper.getIncludedImpactedVerticesWithReferences(toVertex, getRelationshipGuid(edge)) : null; + + if (CollectionUtils.isNotEmpty(impactedEntityVertices)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Propagate {} tags: from {} entity to {} entities", classificationVertices.size(), getTypeName(fromVertex), impactedEntityVertices.size()); + } + + for (AtlasVertex classificationVertex : classificationVertices) { + String classificationName = getTypeName(classificationVertex); + String propagatedEdgeLabel = getPropagatedEdgeLabel(classificationName); + AtlasVertex associatedEntityVertex = getAssociatedEntityVertex(classificationVertex); + AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classificationName); + + for (AtlasVertex impactedEntityVertex : impactedEntityVertices) { + if (edgeExists(impactedEntityVertex, classificationVertex, classificationName)) { + if (LOG.isDebugEnabled()) { + LOG.debug(" --> Classification edge already exists from [{}] --> [{}][{}] using edge label: [{}]", + getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), classificationName); + } + + continue; + } else if (edgeExists(impactedEntityVertex, classificationVertex, propagatedEdgeLabel)) { + if (LOG.isDebugEnabled()) { + LOG.debug(" --> Propagated classification edge already exists from [{}] --> [{}][{}] using edge label: [{}]", + getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel); + } + + continue; + } + + String entityTypeName = getTypeName(impactedEntityVertex); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName); + + if (!classificationType.canApplyToEntityType(entityType)) { + if (LOG.isDebugEnabled()) { + LOG.debug(" --> Not creating propagated classification edge from [{}] --> [{}][{}], classification is not applicable for entity type", + getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex)); + } + + continue; + } + + if (LOG.isDebugEnabled()) { + LOG.debug(" --> Creating propagated classification edge from [{}] --> [{}][{}] using edge label: [{}]", + getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel); + } + + graphHelper.addEdge(impactedEntityVertex, classificationVertex, propagatedEdgeLabel); + + GraphHelper.addListProperty(impactedEntityVertex, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, classificationName); + } + } + } + } + + private void removeTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasEdge edge) throws AtlasBaseException { + final List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(fromVertex); + final List<AtlasVertex> impactedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? graphHelper.getIncludedImpactedVerticesWithReferences(toVertex, getRelationshipGuid(edge)) : null; + + if (CollectionUtils.isNotEmpty(impactedEntityVertices)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Removing {} propagated tags: for {} from {} entities", classificationVertices.size(), getTypeName(fromVertex), impactedEntityVertices.size()); + } + + for (AtlasVertex classificationVertex : classificationVertices) { + String classificationName = getTypeName(classificationVertex); + String propagatedEdgeLabel = getPropagatedEdgeLabel(classificationName); + AtlasVertex associatedEntityVertex = getAssociatedEntityVertex(classificationVertex); + List<AtlasVertex> referrals = graphHelper.getIncludedImpactedVerticesWithReferences(associatedEntityVertex, getRelationshipGuid(edge)); + + for (AtlasVertex impactedEntityVertex : impactedEntityVertices) { + if (referrals.contains(impactedEntityVertex)) { + if (LOG.isDebugEnabled()) { + if (StringUtils.equals(getGuid(impactedEntityVertex), getGuid(associatedEntityVertex))) { + LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is associated with [{}]", + getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel, classificationName, getTypeName(associatedEntityVertex)); + } else { + LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is propagated through other path", + getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel, classificationName); + } + } + + continue; + } + + // remove propagated classification edge and classificationName from propagatedTraitNames vertex property + if (edgeExists(impactedEntityVertex, classificationVertex, propagatedEdgeLabel)) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug(" --> Removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}]", + getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel); + } + + AtlasEdge propagatedEdge = graphHelper.getOrCreateEdge(impactedEntityVertex, classificationVertex, propagatedEdgeLabel); + + graphHelper.removeEdge(propagatedEdge); + + removePropagatedTraitNameFromVertex(impactedEntityVertex, classificationName); + } catch (RepositoryException e) { + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] using edge label: [{}], since edge doesn't exist", + getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), propagatedEdgeLabel); + } + } + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java index 1fda241..58e3492 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java @@ -63,6 +63,18 @@ public class AtlasGremlin3QueryProvider extends AtlasGremlin2QueryProvider { return "g.V().range(0,1).toList()"; case GREMLIN_SEARCH_RETURNS_EDGE_ID: return "g.E().range(0,1).toList()"; + + case TAG_PROPAGATION_IMPACTED_INSTANCES: + return "g.V().has('__guid', guid).aggregate('src')" + + ".repeat(union(outE().has('__state', 'ACTIVE').has('tagPropagation', within('ONE_TO_TWO', 'BOTH')).inV(), " + + "inE().has('__state', 'ACTIVE').has('tagPropagation', within('TWO_TO_ONE', 'BOTH')).outV())" + + ".dedup().where(without('src')).simplePath()).emit().toList();"; + + case TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL: + return "g.V().has('__guid', guid).aggregate('src')" + + ".repeat(union(outE().has('__state', 'ACTIVE').has('tagPropagation', within('ONE_TO_TWO', 'BOTH')).has('_r__guid', neq(relationshipGuid)).inV(), " + + "inE().has('__state', 'ACTIVE').has('tagPropagation', within('TWO_TO_ONE', 'BOTH')).has('_r__guid', neq(relationshipGuid)).outV())" + + ".dedup().where(without('src')).simplePath()).emit().toList();"; } return super.getQuery(gremlinQuery); } http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java index a79abaa..3e3ee11 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java @@ -81,6 +81,9 @@ public abstract class AtlasGremlinQueryProvider { COMPARE_ENDS_WITH, COMPARE_CONTAINS, COMPARE_IS_NULL, - COMPARE_NOT_NULL + COMPARE_NOT_NULL, + + TAG_PROPAGATION_IMPACTED_INSTANCES, + TAG_PROPAGATION_IMPACTED_INSTANCES_FOR_REMOVAL } } http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java b/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java index 0e1e5b6..85f0d06 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java @@ -25,6 +25,7 @@ import org.apache.atlas.repository.graphdb.GraphDatabase; import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1; import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1; import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +53,7 @@ public class AtlasRepositoryConfiguration { private static Integer typeUpdateLockMaxWaitTimeInSeconds = null; private static final String ENABLE_FULLTEXT_SEARCH_PROPERTY = "atlas.search.fulltext.enable"; + private static final String ENTITY_NOTIFICATION_VERSION_PROPERTY = "atlas.notification.entity.version"; /** * Configures whether the full text vertex property is populated. Turning this off @@ -62,6 +64,19 @@ public class AtlasRepositoryConfiguration { return ApplicationProperties.get().getBoolean(ENABLE_FULLTEXT_SEARCH_PROPERTY, true); } + public static boolean isV2EntityNotificationEnabled() { + boolean ret; + try { + String notificationVersion = ApplicationProperties.get().getString(ENTITY_NOTIFICATION_VERSION_PROPERTY, "v2"); + + return StringUtils.equalsIgnoreCase(notificationVersion, "v2"); + } catch (AtlasException e) { + ret = true; + } + + return ret; + } + private static final String AUDIT_REPOSITORY_IMPLEMENTATION_PROPERTY = "atlas.EntityAuditRepository.impl"; @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/test/java/org/apache/atlas/TestModules.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/TestModules.java b/repository/src/test/java/org/apache/atlas/TestModules.java index 13bdcb0..c901e89 100644 --- a/repository/src/test/java/org/apache/atlas/TestModules.java +++ b/repository/src/test/java/org/apache/atlas/TestModules.java @@ -30,8 +30,10 @@ import org.apache.atlas.discovery.EntityDiscoveryService; import org.apache.atlas.discovery.EntityLineageService; import org.apache.atlas.graph.GraphSandboxUtil; import org.apache.atlas.listener.EntityChangeListener; +import org.apache.atlas.listener.EntityChangeListenerV2; import org.apache.atlas.listener.TypeDefChangeListener; import org.apache.atlas.repository.audit.EntityAuditListener; +import org.apache.atlas.repository.audit.EntityAuditListenerV2; import org.apache.atlas.repository.audit.EntityAuditRepository; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; import org.apache.atlas.repository.graphdb.AtlasGraph; @@ -145,6 +147,10 @@ public class TestModules { Multibinder.newSetBinder(binder(), EntityChangeListener.class); entityChangeListenerBinder.addBinding().to(EntityAuditListener.class); + Multibinder<EntityChangeListenerV2> entityChangeListenerV2Binder = + Multibinder.newSetBinder(binder(), EntityChangeListenerV2.class); + entityChangeListenerV2Binder.addBinding().to(EntityAuditListenerV2.class); + final GraphTransactionInterceptor graphTransactionInterceptor = new GraphTransactionInterceptor(new AtlasGraphProvider().get()); requestInjection(graphTransactionInterceptor); bindInterceptor(Matchers.any(), Matchers.annotatedWith(GraphTransaction.class), graphTransactionInterceptor); http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java index 47a61ee..4d70b7f 100644 --- a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java +++ b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java @@ -46,9 +46,9 @@ public class AuditRepositoryTestBase { EntityAuditEvent event = new EntityAuditEvent(rand(), System.currentTimeMillis(), "u1", EntityAuditEvent.EntityAuditAction.ENTITY_CREATE, "d1", new Referenceable(rand())); - eventRepository.putEvents(event); + eventRepository.putEventsV1(event); - List<EntityAuditEvent> events = eventRepository.listEvents(event.getEntityId(), null, (short) 10); + List<EntityAuditEvent> events = eventRepository.listEventsV1(event.getEntityId(), null, (short) 10); assertEquals(events.size(), 1); assertEventEquals(events.get(0), event); @@ -67,28 +67,28 @@ public class AuditRepositoryTestBase { //Add events for both ids EntityAuditEvent event = new EntityAuditEvent(id2, ts - i, "user" + i, EntityAuditEvent.EntityAuditAction.ENTITY_UPDATE, "details" + i, entity); - eventRepository.putEvents(event); + eventRepository.putEventsV1(event); expectedEvents.add(event); - eventRepository.putEvents(new EntityAuditEvent(id1, ts - i, "user" + i, EntityAuditEvent.EntityAuditAction.TAG_DELETE, "details" + i, entity)); - eventRepository.putEvents(new EntityAuditEvent(id3, ts - i, "user" + i, EntityAuditEvent.EntityAuditAction.TAG_ADD, "details" + i, entity)); + eventRepository.putEventsV1(new EntityAuditEvent(id1, ts - i, "user" + i, EntityAuditEvent.EntityAuditAction.TAG_DELETE, "details" + i, entity)); + eventRepository.putEventsV1(new EntityAuditEvent(id3, ts - i, "user" + i, EntityAuditEvent.EntityAuditAction.TAG_ADD, "details" + i, entity)); } //Use ts for which there is no event - ts + 2 - List<EntityAuditEvent> events = eventRepository.listEvents(id2, null, (short) 3); + List<EntityAuditEvent> events = eventRepository.listEventsV1(id2, null, (short) 3); assertEquals(events.size(), 3); assertEventEquals(events.get(0), expectedEvents.get(0)); assertEventEquals(events.get(1), expectedEvents.get(1)); assertEventEquals(events.get(2), expectedEvents.get(2)); //Use last event's timestamp for next list(). Should give only 1 event and shouldn't include events from other id - events = eventRepository.listEvents(id2, events.get(2).getEventKey(), (short) 3); + events = eventRepository.listEventsV1(id2, events.get(2).getEventKey(), (short) 3); assertEquals(events.size(), 1); assertEventEquals(events.get(0), expectedEvents.get(2)); } @Test public void testInvalidEntityId() throws Exception { - List<EntityAuditEvent> events = eventRepository.listEvents(rand(), null, (short) 3); + List<EntityAuditEvent> events = eventRepository.listEventsV1(rand(), null, (short) 3); assertEquals(events.size(), 0); } http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java new file mode 100644 index 0000000..01a95cf --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasException; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.listener.EntityChangeListenerV2; +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.type.AtlasClassificationType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.v1.model.notification.EntityNotificationV2; +import org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.configuration.Configuration; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.atlas.notification.NotificationEntityChangeListener.ATLAS_ENTITY_NOTIFICATION_PROPERTY; +import static org.apache.atlas.notification.NotificationInterface.NotificationType.ENTITIES; +import static org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType.CLASSIFICATION_ADD; +import static org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType.CLASSIFICATION_DELETE; +import static org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType.CLASSIFICATION_UPDATE; +import static org.apache.atlas.repository.graph.GraphHelper.isInternalType; +import static org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType.ENTITY_CREATE; +import static org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType.ENTITY_DELETE; +import static org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType.ENTITY_UPDATE; + +@Component +public class EntityNotificationListenerV2 implements EntityChangeListenerV2 { + private final AtlasTypeRegistry typeRegistry; + private final NotificationInterface notificationInterface; + private final Map<String, List<String>> notificationAttributesCache = new HashMap<>(); + + private static Configuration APPLICATION_PROPERTIES = null; + + @Inject + public EntityNotificationListenerV2(AtlasTypeRegistry typeRegistry, NotificationInterface notificationInterface) { + this.typeRegistry = typeRegistry; + this.notificationInterface = notificationInterface; + } + + @Override + public void onEntitiesAdded(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException { + notifyEntityEvents(entities, ENTITY_CREATE); + } + + @Override + public void onEntitiesUpdated(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException { + notifyEntityEvents(entities, ENTITY_UPDATE); + } + + @Override + public void onEntitiesDeleted(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException { + notifyEntityEvents(entities, ENTITY_DELETE); + } + + @Override + public void onClassificationsAdded(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException { + notifyEntityEvents(Collections.singletonList(entity), CLASSIFICATION_ADD); + } + + @Override + public void onClassificationsUpdated(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException { + notifyEntityEvents(Collections.singletonList(entity), CLASSIFICATION_UPDATE); + } + + @Override + public void onClassificationsDeleted(AtlasEntity entity, List<String> classificationNames) throws AtlasBaseException { + notifyEntityEvents(Collections.singletonList(entity), CLASSIFICATION_DELETE); + } + + private void notifyEntityEvents(List<AtlasEntity> entities, OperationType operationType) throws AtlasBaseException { + List<EntityNotificationV2> messages = new ArrayList<>(); + + for (AtlasEntity entity : entities) { + if (isInternalType(entity.getTypeName())) { + continue; + } + + filterNotificationAttributes(entity); + + messages.add(new EntityNotificationV2(entity, operationType, getAllClassifications(entity))); + } + + if (!messages.isEmpty()) { + try { + notificationInterface.send(ENTITIES, messages); + } catch (NotificationException e) { + throw new AtlasBaseException(AtlasErrorCode.ENTITY_NOTIFICATION_FAILED, e, operationType.name()); + } + } + } + + private List<AtlasClassification> getAllClassifications(AtlasEntity entity) { + List<AtlasClassification> ret = getAllClassifications(entity.getClassifications(), typeRegistry); + + return ret; + } + + private static List<AtlasClassification> getAllClassifications(List<AtlasClassification> classifications, AtlasTypeRegistry typeRegistry) { + List<AtlasClassification> ret = new LinkedList<>(); + + if (CollectionUtils.isNotEmpty(classifications)) { + for (AtlasClassification classification : classifications) { + AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classification.getTypeName()); + Set<String> superTypeNames = classificationType != null ? classificationType.getAllSuperTypes() : null; + + ret.add(classification); + + if (CollectionUtils.isNotEmpty(superTypeNames)) { + for (String superTypeName : superTypeNames) { + AtlasClassification superTypeClassification = new AtlasClassification(superTypeName); + + superTypeClassification.setEntityGuid(classification.getEntityGuid()); + superTypeClassification.setPropagate(classification.isPropagate()); + + if (MapUtils.isNotEmpty(classification.getAttributes())) { + AtlasClassificationType superType = typeRegistry.getClassificationTypeByName(superTypeName); + + if (superType != null && MapUtils.isNotEmpty(superType.getAllAttributes())) { + Map<String, Object> superTypeClassificationAttributes = new HashMap<>(); + + for (Map.Entry<String, Object> attrEntry : classification.getAttributes().entrySet()) { + String attrName = attrEntry.getKey(); + + if (superType.getAllAttributes().containsKey(attrName)) { + superTypeClassificationAttributes.put(attrName, attrEntry.getValue()); + } + } + + superTypeClassification.setAttributes(superTypeClassificationAttributes); + } + } + + ret.add(superTypeClassification); + } + } + } + } + + return ret; + } + + private void filterNotificationAttributes(AtlasEntity entity) { + Map<String, Object> attributesMap = entity.getAttributes(); + List<String> notificationAttrs = getNotificationAttributes(entity.getTypeName()); + + if (MapUtils.isNotEmpty(attributesMap) && CollectionUtils.isNotEmpty(notificationAttrs)) { + Collection<String> attributesToRemove = CollectionUtils.subtract(attributesMap.keySet(), notificationAttrs); + + for (String attributeToRemove : attributesToRemove) { + attributesMap.remove(attributeToRemove); + } + } + } + + private List<String> getNotificationAttributes(String entityType) { + List<String> ret = null; + + initApplicationProperties(); + + if (notificationAttributesCache.containsKey(entityType)) { + ret = notificationAttributesCache.get(entityType); + } else if (APPLICATION_PROPERTIES != null) { + String[] notificationAttributes = APPLICATION_PROPERTIES.getStringArray(ATLAS_ENTITY_NOTIFICATION_PROPERTY + "." + + entityType + "." + "attributes.include"); + + if (notificationAttributes != null) { + ret = Arrays.asList(notificationAttributes); + } + + notificationAttributesCache.put(entityType, ret); + } + + return ret; + } + + private void initApplicationProperties() { + if (APPLICATION_PROPERTIES == null) { + try { + APPLICATION_PROPERTIES = ApplicationProperties.get(); + } catch (AtlasException ex) { + // ignore + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java index 396a292..a3e5949 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java @@ -42,7 +42,7 @@ import java.util.*; */ @Component public class NotificationEntityChangeListener implements EntityChangeListener { - private static final String ATLAS_ENTITY_NOTIFICATION_PROPERTY = "atlas.notification.entity"; + protected static final String ATLAS_ENTITY_NOTIFICATION_PROPERTY = "atlas.notification.entity"; private final NotificationInterface notificationInterface; private final AtlasTypeRegistry typeRegistry; http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java index 715a54d..4f4f091 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java @@ -18,7 +18,6 @@ package org.apache.atlas.web.resources; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -28,7 +27,7 @@ import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; import org.apache.atlas.CreateUpdateEntitiesResult; import org.apache.atlas.EntityAuditEvent; -import org.apache.atlas.discovery.AtlasDiscoveryService; +import org.apache.atlas.model.audit.EntityAuditEventV2; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; @@ -98,7 +97,7 @@ public class EntityResource { private final AtlasTypeRegistry typeRegistry; private final EntityREST entityREST; private final EntityAuditRepository entityAuditRepository; - private final AtlasDiscoveryService atlasDiscoveryService; + private final AtlasInstanceConverter instanceConverter; @Context UriInfo uriInfo; @@ -109,13 +108,13 @@ public class EntityResource { final AtlasTypeRegistry typeRegistry, final EntityREST entityREST, final EntityAuditRepository entityAuditRepository, - final AtlasDiscoveryService atlasDiscoveryService) { + final AtlasInstanceConverter instanceConverter) { this.restAdapters = restAdapters; this.entitiesStore = entitiesStore; this.typeRegistry = typeRegistry; this.entityREST = entityREST; this.entityAuditRepository = entityAuditRepository; - this.atlasDiscoveryService = atlasDiscoveryService; + this.instanceConverter = instanceConverter; } /** @@ -1133,20 +1132,27 @@ public class EntityResource { AtlasPerfTracer perf = null; - if (LOG.isDebugEnabled()) { - LOG.debug("Audit events request for entity {}, start key {}, number of results required {}", guid, startKey, count); - } - try { if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityResource.getAuditEvents(" + guid + ", " + startKey + ", " + count + ")"); } - List<EntityAuditEvent> events = entityAuditRepository.listEvents(guid, startKey, count); + List events = entityAuditRepository.listEvents(guid, startKey, count); + List<EntityAuditEvent> v1Events = new ArrayList<>(); + + for (Object event : events) { + if (event instanceof EntityAuditEvent) { + v1Events.add((EntityAuditEvent) event); + } else if (event instanceof EntityAuditEventV2) { + v1Events.add(instanceConverter.toV1AuditEvent((EntityAuditEventV2) event)); + } else { + LOG.warn("unknown entity-audit event type {}. Ignored", event != null ? event.getClass().getCanonicalName() : "null"); + } + } Map<String, Object> response = new HashMap<>(); response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); - response.put(AtlasClient.EVENTS, events); + response.put(AtlasClient.EVENTS, v1Events); return Response.ok(AtlasJson.toV1Json(response)).build(); } catch (IllegalArgumentException e) { LOG.error("Unable to get audit events for entity guid={} startKey={}", guid, startKey, e); http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java index 21402e1..fdafa2c 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java @@ -18,6 +18,8 @@ package org.apache.atlas.web.rest; import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.EntityAuditEvent; +import org.apache.atlas.model.audit.EntityAuditEventV2; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasClassification; @@ -26,6 +28,8 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.ClassificationAssociateRequest; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; +import org.apache.atlas.repository.audit.EntityAuditRepository; +import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream; import org.apache.atlas.repository.store.graph.v1.EntityStream; @@ -38,6 +42,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import javax.inject.Inject; @@ -45,6 +50,7 @@ import javax.inject.Singleton; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.PUT; @@ -67,17 +73,24 @@ import java.util.Map; @Singleton @Service public class EntityREST { + private static final Logger LOG = LoggerFactory.getLogger(EntityREST.class); private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("rest.EntityREST"); public static final String PREFIX_ATTR = "attr:"; - private final AtlasTypeRegistry typeRegistry; - private final AtlasEntityStore entitiesStore; + private final AtlasTypeRegistry typeRegistry; + private final AtlasEntityStore entitiesStore; + private final EntityAuditRepository auditRepository; + private final AtlasInstanceConverter instanceConverter; + @Inject - public EntityREST(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore) { - this.typeRegistry = typeRegistry; - this.entitiesStore = entitiesStore; + public EntityREST(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore, + EntityAuditRepository auditRepository, AtlasInstanceConverter instanceConverter) { + this.typeRegistry = typeRegistry; + this.entitiesStore = entitiesStore; + this.auditRepository = auditRepository; + this.instanceConverter = instanceConverter; } /** @@ -409,14 +422,14 @@ public class EntityREST { @PUT @Path("/guid/{guid}/classifications") @Produces(Servlets.JSON_MEDIA_TYPE) - public void updateClassification(@PathParam("guid") final String guid, List<AtlasClassification> classifications) throws AtlasBaseException { + public void updateClassifications(@PathParam("guid") final String guid, List<AtlasClassification> classifications) throws AtlasBaseException { Servlets.validateQueryParamLength("guid", guid); AtlasPerfTracer perf = null; try { if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { - perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.updateClassification(" + guid + ")"); + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.updateClassifications(" + guid + ")"); } if (StringUtils.isEmpty(guid)) { @@ -581,6 +594,38 @@ public class EntityREST { } } + @GET + @Path("{guid}/audit") + @Consumes(Servlets.JSON_MEDIA_TYPE) + @Produces(Servlets.JSON_MEDIA_TYPE) + public List<EntityAuditEventV2> getAuditEvents(@PathParam("guid") String guid, @QueryParam("startKey") String startKey, + @QueryParam("count") @DefaultValue("100") short count) throws AtlasBaseException { + AtlasPerfTracer perf = null; + + try { + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.getAuditEvents(" + guid + ", " + startKey + ", " + count + ")"); + } + + List events = auditRepository.listEvents(guid, startKey, count); + List<EntityAuditEventV2> ret = new ArrayList<>(); + + for (Object event : events) { + if (event instanceof EntityAuditEventV2) { + ret.add((EntityAuditEventV2) event); + } else if (event instanceof EntityAuditEvent) { + ret.add(instanceConverter.toV2AuditEvent((EntityAuditEvent) event)); + } else { + LOG.warn("unknown entity-audit event type {}. Ignored", event != null ? event.getClass().getCanonicalName() : "null"); + } + } + + return ret; + } finally { + AtlasPerfTracer.log(perf); + } + } + private AtlasEntityType ensureEntityType(String typeName) throws AtlasBaseException { AtlasEntityType ret = typeRegistry.getEntityTypeByName(typeName); http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntityREST.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntityREST.java b/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntityREST.java index ea6fe31..f0bc962 100644 --- a/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntityREST.java +++ b/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntityREST.java @@ -164,7 +164,7 @@ public class TestEntityREST { put("tag", "tagName_updated"); }}); - entityREST.updateClassification(dbEntity.getGuid(), new ArrayList<>(Arrays.asList(phiClassification, testClassification))); + entityREST.updateClassifications(dbEntity.getGuid(), new ArrayList<>(Arrays.asList(phiClassification, testClassification))); AtlasClassification updatedClassification = entityREST.getClassification(dbEntity.getGuid(), TestUtilsV2.PHI); Assert.assertNotNull(updatedClassification); http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java index f769431..e4887d2 100755 --- a/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java +++ b/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java @@ -808,7 +808,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT { } catch (AtlasServiceException e) { assertNotNull(e); assertNotNull(e.getStatus()); - assertEquals(e.getStatus(), ClientResponse.Status.NOT_FOUND); + assertEquals(e.getStatus(), ClientResponse.Status.BAD_REQUEST); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java index 0f43d6f..dabb2ef 100755 --- a/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java +++ b/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java @@ -560,7 +560,7 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT { fail("Deletion should've failed for non-existent trait association"); } catch (AtlasServiceException ex) { Assert.assertNotNull(ex.getStatus()); - assertEquals(ex.getStatus(), ClientResponse.Status.NOT_FOUND); + assertEquals(ex.getStatus(), ClientResponse.Status.BAD_REQUEST); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/webapp/src/test/resources/atlas-application.properties ---------------------------------------------------------------------- diff --git a/webapp/src/test/resources/atlas-application.properties b/webapp/src/test/resources/atlas-application.properties index d900916..62fa603 100644 --- a/webapp/src/test/resources/atlas-application.properties +++ b/webapp/src/test/resources/atlas-application.properties @@ -64,6 +64,7 @@ atlas.lineage.schema.query.hive_table_v1=hive_table_v1 where __guid='%s'\, colum ######### Notification Configs ######### atlas.notification.embedded=true +atlas.notification.entity.version=v1 atlas.kafka.zookeeper.connect=localhost:19026 atlas.kafka.bootstrap.servers=localhost:19027
