Repository: atlas Updated Branches: refs/heads/branch-0.8 6bc4039aa -> d6bd99065
ATLAS-2905: Generate lineage information for process entities Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/d6bd9906 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/d6bd9906 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/d6bd9906 Branch: refs/heads/branch-0.8 Commit: d6bd9906546ca5adf069ae33f39b8b4afd8ddc84 Parents: 6bc4039 Author: Sarath Subramanian <[email protected]> Authored: Tue Oct 2 15:37:45 2018 -0700 Committer: Sarath Subramanian <[email protected]> Committed: Tue Oct 2 21:42:35 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/atlas/AtlasErrorCode.java | 3 +- .../atlas/discovery/EntityLineageService.java | 110 +++++++++++-------- .../atlas/util/AtlasGremlin2QueryProvider.java | 20 ++-- .../atlas/util/AtlasGremlinQueryProvider.java | 6 +- 4 files changed, 81 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/d6bd9906/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java index 8163390..ac82f25 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java +++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java @@ -92,8 +92,9 @@ public enum AtlasErrorCode { INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND(404, "ATLAS-404-00-009", "Instance {0} with unique attribute {1} does not exist"), REFERENCED_ENTITY_NOT_FOUND(404, "ATLAS-404-00-00A", "Referenced entity {0} is not found"), INSTANCE_NOT_FOUND(404, "ATLAS-404-00-00B", "Given instance is invalid/not found: {0}"), + INVALID_LINEAGE_ENTITY_TYPE(404, "ATLAS-404-00-00C", "Given instance guid {0} with type {1} is not a valid lineage entity type."), - // All data conflict errors go here + // All data conflict errors go here TYPE_ALREADY_EXISTS(409, "ATLAS-409-00-001", "Given type {0} already exists"), TYPE_HAS_REFERENCES(409, "ATLAS-409-00-002", "Given type {0} has references"), INSTANCE_ALREADY_EXISTS(409, "ATLAS-409-00-003", "failed to update entity: {0}"), http://git-wip-us.apache.org/repos/asf/atlas/blob/d6bd9906/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 3ae41c8..c928d35 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -19,7 +19,6 @@ package org.apache.atlas.discovery; -import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.exception.AtlasBaseException; @@ -27,25 +26,33 @@ import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.lineage.AtlasLineageInfo; import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection; import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation; -import org.apache.atlas.repository.Constants; -import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever; +import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.util.AtlasGremlinQueryProvider; -import org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery; import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Service; import javax.inject.Inject; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import static org.apache.atlas.AtlasClient.DATA_SET_SUPER_TYPE; +import static org.apache.atlas.AtlasClient.PROCESS_SUPER_TYPE; +import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.BOTH; +import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.INPUT; +import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.OUTPUT; +import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.FULL_LINEAGE_DATASET; +import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.FULL_LINEAGE_PROCESS; +import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.PARTIAL_LINEAGE_DATASET; +import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.PARTIAL_LINEAGE_PROCESS; + @Service public class EntityLineageService implements AtlasLineageService { private static final String INPUT_PROCESS_EDGE = "__Process.inputs"; @@ -54,30 +61,46 @@ public class EntityLineageService implements AtlasLineageService { private final AtlasGraph graph; private final AtlasGremlinQueryProvider gremlinQueryProvider; private final EntityGraphRetriever entityRetriever; + private final AtlasTypeRegistry atlasTypeRegistry; @Inject - EntityLineageService(AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph) throws DiscoveryException { + EntityLineageService(AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph) { this.graph = atlasGraph; this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE; this.entityRetriever = new EntityGraphRetriever(typeRegistry); + this.atlasTypeRegistry = typeRegistry; } @Override @GraphTransaction public AtlasLineageInfo getAtlasLineageInfo(String guid, LineageDirection direction, int depth) throws AtlasBaseException { - AtlasLineageInfo lineageInfo; + AtlasLineageInfo ret; + AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(guid); + AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName(entity.getTypeName()); + + if (entityType == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, entity.getTypeName()); + } + + List<String> typeAndAllSuperTypes = getTypeAndAllSuperTypes(entityType); - if (!entityExists(guid)) { - throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); + boolean isDataSet = typeAndAllSuperTypes.contains(DATA_SET_SUPER_TYPE); + + if (!isDataSet) { + boolean isProcess = typeAndAllSuperTypes.contains(PROCESS_SUPER_TYPE); + + if (!isProcess) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, entity.getTypeName()); + } } if (direction != null) { - if (direction.equals(LineageDirection.INPUT)) { - lineageInfo = getLineageInfo(guid, LineageDirection.INPUT, depth); - } else if (direction.equals(LineageDirection.OUTPUT)) { - lineageInfo = getLineageInfo(guid, LineageDirection.OUTPUT, depth); - } else if (direction.equals(LineageDirection.BOTH)) { - lineageInfo = getBothLineageInfo(guid, depth); + if (direction.equals(INPUT)) { + ret = getLineageInfo(guid, INPUT, depth, isDataSet); + } else if (direction.equals(OUTPUT)) { + ret = getLineageInfo(guid, OUTPUT, depth, isDataSet); + } else if (direction.equals(BOTH)) { + ret = getBothLineageInfo(guid, depth, isDataSet); } else { throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", direction.toString()); } @@ -85,13 +108,13 @@ public class EntityLineageService implements AtlasLineageService { throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", null); } - return lineageInfo; + return ret; } - private AtlasLineageInfo getLineageInfo(String guid, LineageDirection direction, int depth) throws AtlasBaseException { + private AtlasLineageInfo getLineageInfo(String guid, LineageDirection direction, int depth, boolean isDataSet) throws AtlasBaseException { Map<String, AtlasEntityHeader> entities = new HashMap<>(); Set<LineageRelation> relations = new HashSet<>(); - String lineageQuery = getLineageQuery(guid, direction, depth); + String lineageQuery = getLineageQuery(guid, direction, depth, isDataSet); List paths = (List) graph.executeGremlinScript(lineageQuery, true); @@ -115,9 +138,9 @@ public class EntityLineageService implements AtlasLineageService { } if (prev != null) { - if (direction.equals(LineageDirection.INPUT)) { + if (direction.equals(INPUT)) { relations.add(new LineageRelation(entity.getGuid(), prev.getGuid())); - } else if (direction.equals(LineageDirection.OUTPUT)) { + } else if (direction.equals(OUTPUT)) { relations.add(new LineageRelation(prev.getGuid(), entity.getGuid())); } } @@ -131,54 +154,55 @@ public class EntityLineageService implements AtlasLineageService { return new AtlasLineageInfo(guid, entities, relations, direction, depth); } - private AtlasLineageInfo getBothLineageInfo(String guid, int depth) throws AtlasBaseException { - AtlasLineageInfo inputLineage = getLineageInfo(guid, LineageDirection.INPUT, depth); - AtlasLineageInfo outputLineage = getLineageInfo(guid, LineageDirection.OUTPUT, depth); + private AtlasLineageInfo getBothLineageInfo(String guid, int depth, boolean isDataSet) throws AtlasBaseException { + AtlasLineageInfo inputLineage = getLineageInfo(guid, INPUT, depth, isDataSet); + AtlasLineageInfo outputLineage = getLineageInfo(guid, OUTPUT, depth, isDataSet); AtlasLineageInfo ret = inputLineage; ret.getRelations().addAll(outputLineage.getRelations()); ret.getGuidEntityMap().putAll(outputLineage.getGuidEntityMap()); - ret.setLineageDirection(LineageDirection.BOTH); + ret.setLineageDirection(BOTH); return ret; } - private String getLineageQuery(String entityGuid, LineageDirection direction, int depth) throws AtlasBaseException { - String lineageQuery = null; + private String getLineageQuery(String entityGuid, LineageDirection direction, int depth, boolean isDataSet) throws AtlasBaseException { + String ret = null; - if (direction.equals(LineageDirection.INPUT)) { - lineageQuery = generateLineageQuery(entityGuid, depth, OUTPUT_PROCESS_EDGE, INPUT_PROCESS_EDGE); + if (direction.equals(INPUT)) { + ret = generateLineageQuery(entityGuid, depth, isDataSet, OUTPUT_PROCESS_EDGE, INPUT_PROCESS_EDGE); - } else if (direction.equals(LineageDirection.OUTPUT)) { - lineageQuery = generateLineageQuery(entityGuid, depth, INPUT_PROCESS_EDGE, OUTPUT_PROCESS_EDGE); + } else if (direction.equals(OUTPUT)) { + ret = generateLineageQuery(entityGuid, depth, isDataSet, INPUT_PROCESS_EDGE, OUTPUT_PROCESS_EDGE); } - return lineageQuery; + return ret; } - private String generateLineageQuery(String entityGuid, int depth, String incomingFrom, String outgoingTo) { + private String generateLineageQuery(String entityGuid, int depth, boolean isDataSet, String incomingFrom, String outgoingTo) { String lineageQuery; + if (depth < 1) { - String query = gremlinQueryProvider.getQuery(AtlasGremlinQuery.FULL_LINEAGE); + String query = isDataSet ? gremlinQueryProvider.getQuery(FULL_LINEAGE_DATASET) : gremlinQueryProvider.getQuery(FULL_LINEAGE_PROCESS); lineageQuery = String.format(query, entityGuid, incomingFrom, outgoingTo); } else { - String query = gremlinQueryProvider.getQuery(AtlasGremlinQuery.PARTIAL_LINEAGE); + String query = isDataSet ? gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_DATASET) : gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_PROCESS); lineageQuery = String.format(query, entityGuid, incomingFrom, outgoingTo, depth); } return lineageQuery; } - private boolean entityExists(String guid) { - boolean ret = false; - Iterator<AtlasVertex> results = graph.query() - .has(Constants.GUID_PROPERTY_KEY, guid) - .vertices().iterator(); + private List<String> getTypeAndAllSuperTypes(AtlasEntityType entityType) { + List<String> ret = null; + + if (entityType != null) { + ret = new ArrayList<>(); - while (results.hasNext()) { - AtlasVertex entityVertex = results.next(); - List<String> superTypes = GraphHelper.getSuperTypeNames(entityVertex); + ret.add(entityType.getTypeName()); - ret = (CollectionUtils.isNotEmpty(superTypes)) ? superTypes.contains(AtlasClient.DATA_SET_SUPER_TYPE) : false; + if (CollectionUtils.isNotEmpty(entityType.getAllSuperTypes())) { + ret.addAll(entityType.getAllSuperTypes()); + } } return ret; http://git-wip-us.apache.org/repos/asf/atlas/blob/d6bd9906/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java index a68e3c3..2278aef 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java @@ -55,18 +55,14 @@ public class AtlasGremlin2QueryProvider extends AtlasGremlinQueryProvider { return "g.V().has('__typeName',typeName).filter({it.getProperty(attrName).matches(attrValue)}).has('__guid').__guid.toList()"; case EXPORT_TYPE_DEFAULT: return "g.V().has('__typeName',typeName).has(attrName, attrValue).has('__guid').__guid.toList()"; - case FULL_LINEAGE: - return "g.V('__guid', '%s').as('src').in('%s').out('%s')." + - "loop('src', {((it.path.contains(it.object)) ? false : true)}, " + - "{((it.object.'__superTypeNames') ? " + - "(it.object.'__superTypeNames'.contains('DataSet')) : false)})." + - "path().toList()"; - case PARTIAL_LINEAGE: - return "g.V('__guid', '%s').as('src').in('%s').out('%s')." + - "loop('src', {it.loops <= %s}, {((it.object.'__superTypeNames') ? " + - "(it.object.'__superTypeNames'.contains('DataSet')) : false)})." + - "path().toList()"; - + case FULL_LINEAGE_DATASET: + return "g.V('__guid', '%s').as('src').in('%s').out('%s').loop('src', {((it.path.contains(it.object)) ? false : true)}, {true}).path().toList()"; + case FULL_LINEAGE_PROCESS: + return "g.V('__guid', '%s').as('src').out('%s').in('%s').loop('src', {((it.path.contains(it.object)) ? false : true)}, {true}).path().toList()"; + case PARTIAL_LINEAGE_DATASET: + return "g.V('__guid', '%s').as('src').in('%s').out('%s').loop('src', {it.loops <= %s}, {true}).path().toList()"; + case PARTIAL_LINEAGE_PROCESS: + return "g.V('__guid', '%s').as('src').out('%s').in('%s').loop('src', {it.loops <= %s}, {true}).path().toList()"; case BASIC_SEARCH_TYPE_FILTER: return ".has('__typeName', T.in, typeNames)"; case BASIC_SEARCH_CLASSIFICATION_FILTER: http://git-wip-us.apache.org/repos/asf/atlas/blob/d6bd9906/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java index 646628f..4c18a5d 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java @@ -53,8 +53,10 @@ public abstract class AtlasGremlinQueryProvider { EXPORT_TYPE_DEFAULT, // Lineage Queries - FULL_LINEAGE, - PARTIAL_LINEAGE, + FULL_LINEAGE_DATASET, + FULL_LINEAGE_PROCESS, + PARTIAL_LINEAGE_DATASET, + PARTIAL_LINEAGE_PROCESS, // Discovery Queries BASIC_SEARCH_TYPE_FILTER,
