Repository: atlas Updated Branches: refs/heads/master d5f46e3f5 -> 8cc12be18
ATLAS-2905: Generate lineage information for process entities Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/8cc12be1 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/8cc12be1 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/8cc12be1 Branch: refs/heads/master Commit: 8cc12be185d388798a9de4e1d798e69de9795374 Parents: d5f46e3 Author: Sarath Subramanian <ssubraman...@hortonworks.com> Authored: Tue Oct 2 15:37:45 2018 -0700 Committer: Sarath Subramanian <ssubraman...@hortonworks.com> Committed: Tue Oct 2 15:37:45 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/atlas/AtlasErrorCode.java | 2 +- .../atlas/discovery/EntityLineageService.java | 75 ++++++++++++-------- .../atlas/util/AtlasGremlin2QueryProvider.java | 4 +- .../atlas/util/AtlasGremlin3QueryProvider.java | 8 ++- .../atlas/util/AtlasGremlinQueryProvider.java | 6 +- 5 files changed, 60 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/8cc12be1/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java index 47726aa..2fe389c 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java +++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java @@ -173,7 +173,7 @@ public enum AtlasErrorCode { RELATIONSHIPDEF_END_TYPE_NAME_NOT_FOUND(404, "ATLAS-404-00-00E", "RelationshipDef {0} endDef typename {0} cannot be found"), RELATIONSHIP_ALREADY_DELETED(404, "ATLAS-404-00-00F", "Attempting to delete a relationship which is already deleted : {0}"), INVALID_ENTITY_GUID_FOR_CLASSIFICATION_UPDATE(404, "ATLAS-404-00-010", "Updating entityGuid of classification is not allowed."), - INSTANCE_GUID_NOT_DATASET(404, "ATLAS-404-00-011", "Given instance guid {0} is not a dataset"), + INVALID_LINEAGE_ENTITY_TYPE(404, "ATLAS-404-00-011", "Given instance guid {0} with type {1} is not a valid lineage entity type."), INSTANCE_GUID_DELETED(404, "ATLAS-404-00-012", "Given instance guid {0} has been deleted"), NO_PROPAGATED_CLASSIFICATIONS_FOUND_FOR_ENTITY(404, "ATLAS-404-00-013", "No propagated classifications associated with entity: {0}"), http://git-wip-us.apache.org/repos/asf/atlas/blob/8cc12be1/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 5d02ea5..6f2f97b 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -19,7 +19,6 @@ package org.apache.atlas.discovery; -import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.authorize.AtlasAuthorizationUtils; @@ -33,7 +32,6 @@ import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.lineage.AtlasLineageInfo; import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection; import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation; -import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; @@ -61,7 +59,16 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.atlas.AtlasClient.DATA_SET_SUPER_TYPE; +import static org.apache.atlas.AtlasClient.PROCESS_SUPER_TYPE; +import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.BOTH; +import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.INPUT; +import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.OUTPUT; import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY; +import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.FULL_LINEAGE_DATASET; +import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.FULL_LINEAGE_PROCESS; +import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.PARTIAL_LINEAGE_DATASET; +import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.PARTIAL_LINEAGE_PROCESS; @Service public class EntityLineageService implements AtlasLineageService { @@ -87,7 +94,7 @@ public class EntityLineageService implements AtlasLineageService { @Override @GraphTransaction public AtlasLineageInfo getAtlasLineageInfo(String guid, LineageDirection direction, int depth) throws AtlasBaseException { - AtlasLineageInfo lineageInfo; + AtlasLineageInfo ret; AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); @@ -95,17 +102,28 @@ public class EntityLineageService implements AtlasLineageService { AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName(entity.getTypeName()); - if (entityType == null || !entityType.getTypeAndAllSuperTypes().contains(AtlasClient.DATA_SET_SUPER_TYPE)) { - throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_DATASET, guid); + if (entityType == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, entity.getTypeName()); + } + + boolean isDataSet = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE); + + if (!isDataSet) { + boolean isProcess = entityType.getTypeAndAllSuperTypes().contains(PROCESS_SUPER_TYPE); + + if (!isProcess) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, entity.getTypeName()); + } + } if (direction != null) { - if (direction.equals(LineageDirection.INPUT)) { - lineageInfo = getLineageInfo(guid, LineageDirection.INPUT, depth); - } else if (direction.equals(LineageDirection.OUTPUT)) { - lineageInfo = getLineageInfo(guid, LineageDirection.OUTPUT, depth); - } else if (direction.equals(LineageDirection.BOTH)) { - lineageInfo = getBothLineageInfo(guid, depth); + if (direction.equals(INPUT)) { + ret = getLineageInfo(guid, INPUT, depth, isDataSet); + } else if (direction.equals(OUTPUT)) { + ret = getLineageInfo(guid, OUTPUT, depth, isDataSet); + } else if (direction.equals(BOTH)) { + ret = getBothLineageInfo(guid, depth, isDataSet); } else { throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", direction.toString()); } @@ -113,7 +131,7 @@ public class EntityLineageService implements AtlasLineageService { throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", null); } - return lineageInfo; + return ret; } @Override @@ -184,10 +202,10 @@ public class EntityLineageService implements AtlasLineageService { return columnIds.contains(e.getValue().getGuid()); } - private AtlasLineageInfo getLineageInfo(String guid, LineageDirection direction, int depth) throws AtlasBaseException { + private AtlasLineageInfo getLineageInfo(String guid, LineageDirection direction, int depth, boolean isDataSet) throws AtlasBaseException { Map<String, AtlasEntityHeader> entities = new HashMap<>(); Set<LineageRelation> relations = new HashSet<>(); - String lineageQuery = getLineageQuery(guid, direction, depth); + String lineageQuery = getLineageQuery(guid, direction, depth, isDataSet); List edgeMapList = (List) graph.executeGremlinScript(lineageQuery, false); @@ -244,38 +262,39 @@ public class EntityLineageService implements AtlasLineageService { } } - private AtlasLineageInfo getBothLineageInfo(String guid, int depth) throws AtlasBaseException { - AtlasLineageInfo inputLineage = getLineageInfo(guid, LineageDirection.INPUT, depth); - AtlasLineageInfo outputLineage = getLineageInfo(guid, LineageDirection.OUTPUT, depth); + private AtlasLineageInfo getBothLineageInfo(String guid, int depth, boolean isDataSet) throws AtlasBaseException { + AtlasLineageInfo inputLineage = getLineageInfo(guid, INPUT, depth, isDataSet); + AtlasLineageInfo outputLineage = getLineageInfo(guid, OUTPUT, depth, isDataSet); AtlasLineageInfo ret = inputLineage; ret.getRelations().addAll(outputLineage.getRelations()); ret.getGuidEntityMap().putAll(outputLineage.getGuidEntityMap()); - ret.setLineageDirection(LineageDirection.BOTH); + ret.setLineageDirection(BOTH); return ret; } - private String getLineageQuery(String entityGuid, LineageDirection direction, int depth) { - String lineageQuery = null; + private String getLineageQuery(String entityGuid, LineageDirection direction, int depth, boolean isDataSet) { + String ret = null; - if (direction.equals(LineageDirection.INPUT)) { - lineageQuery = generateLineageQuery(entityGuid, depth, PROCESS_OUTPUTS_EDGE, PROCESS_INPUTS_EDGE); + if (direction.equals(INPUT)) { + ret = generateLineageQuery(entityGuid, depth, isDataSet, PROCESS_OUTPUTS_EDGE, PROCESS_INPUTS_EDGE); - } else if (direction.equals(LineageDirection.OUTPUT)) { - lineageQuery = generateLineageQuery(entityGuid, depth, PROCESS_INPUTS_EDGE, PROCESS_OUTPUTS_EDGE); + } else if (direction.equals(OUTPUT)) { + ret = generateLineageQuery(entityGuid, depth, isDataSet, PROCESS_INPUTS_EDGE, PROCESS_OUTPUTS_EDGE); } - return lineageQuery; + return ret; } - private String generateLineageQuery(String entityGuid, int depth, String incomingFrom, String outgoingTo) { + private String generateLineageQuery(String entityGuid, int depth, boolean isDataSet, String incomingFrom, String outgoingTo) { String lineageQuery; + if (depth < 1) { - String query = gremlinQueryProvider.getQuery(AtlasGremlinQuery.FULL_LINEAGE); + String query = isDataSet ? gremlinQueryProvider.getQuery(FULL_LINEAGE_DATASET) : gremlinQueryProvider.getQuery(FULL_LINEAGE_PROCESS); lineageQuery = String.format(query, entityGuid, incomingFrom, outgoingTo); } else { - String query = gremlinQueryProvider.getQuery(AtlasGremlinQuery.PARTIAL_LINEAGE); + String query = isDataSet ? gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_DATASET) : gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_PROCESS); lineageQuery = String.format(query, entityGuid, incomingFrom, outgoingTo, depth); } return lineageQuery; http://git-wip-us.apache.org/repos/asf/atlas/blob/8cc12be1/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java index 82243ad..4be8ce3 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java @@ -44,13 +44,13 @@ public class AtlasGremlin2QueryProvider extends AtlasGremlinQueryProvider { return "g.V().has('__typeName',typeName).filter({it.getProperty(attrName).matches(attrValue)}).has('__guid').__guid.toList()"; case EXPORT_TYPE_DEFAULT: return "g.V().has('__typeName',typeName).has(attrName, attrValue).has('__guid').__guid.toList()"; - case FULL_LINEAGE: + case FULL_LINEAGE_DATASET: return "g.V('__guid', '%s').as('src').in('%s').out('%s')." + "loop('src', {((it.path.contains(it.object)) ? false : true)}, " + "{((it.object.'__superTypeNames') ? " + "(it.object.'__superTypeNames'.contains('DataSet')) : false)})." + "path().toList()"; - case PARTIAL_LINEAGE: + case PARTIAL_LINEAGE_DATASET: return "g.V('__guid', '%s').as('src').in('%s').out('%s')." + "loop('src', {it.loops <= %s}, {((it.object.'__superTypeNames') ? " + "(it.object.'__superTypeNames'.contains('DataSet')) : false)})." + http://git-wip-us.apache.org/repos/asf/atlas/blob/8cc12be1/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java index 9585a57..866e5af 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java @@ -45,10 +45,14 @@ public class AtlasGremlin3QueryProvider extends AtlasGremlin2QueryProvider { return "g.V().has('__guid', startGuid).outE().inV().has('__guid').project('__guid', 'isProcess').by('__guid').by(map {it.get().values('__superTypeNames').toSet().contains('Process')}).dedup().toList()"; case EXPORT_TYPE_ALL_FOR_TYPE: return "g.V().has('__typeName', within(typeName)).has('__guid').values('__guid').toList()"; - case FULL_LINEAGE: + case FULL_LINEAGE_DATASET: return "g.V().has('__guid', '%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).emit().select('e1', 'e2').toList()"; - case PARTIAL_LINEAGE: + case PARTIAL_LINEAGE_DATASET: return "g.V().has('__guid', '%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).times(%s).emit().select('e1', 'e2').toList()"; + case FULL_LINEAGE_PROCESS: + return "g.V().has('__guid', '%s').repeat(__.outE('%s').as('e1').inV().inE('%s').as('e2').outV()).emit().select('e1', 'e2').toList()"; + case PARTIAL_LINEAGE_PROCESS: + return "g.V().has('__guid', '%s').repeat(__.outE('%s').as('e1').inV().inE('%s').as('e2').outV()).times(%s).emit().select('e1', 'e2').toList()"; case TO_RANGE_LIST: return ".range(startIdx, endIdx).toList()"; case RELATIONSHIP_SEARCH: http://git-wip-us.apache.org/repos/asf/atlas/blob/8cc12be1/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java index 8555b4c..d201db3 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java @@ -53,8 +53,10 @@ public abstract class AtlasGremlinQueryProvider { EXPORT_TYPE_DEFAULT, // Lineage Queries - FULL_LINEAGE, - PARTIAL_LINEAGE, + FULL_LINEAGE_DATASET, + FULL_LINEAGE_PROCESS, + PARTIAL_LINEAGE_DATASET, + PARTIAL_LINEAGE_PROCESS, // Discovery Queries BASIC_SEARCH_TYPE_FILTER,