Repository: atlas Updated Branches: refs/heads/branch-0.8 707bcb1cb -> 2a547434b
ATLAS-2927: Update lineage query for Process entities Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/2a547434 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/2a547434 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/2a547434 Branch: refs/heads/branch-0.8 Commit: 2a547434b5dd5a8a78ed915f17b05b674aa9df95 Parents: 707bcb1 Author: Sarath Subramanian <[email protected]> Authored: Wed Oct 24 11:10:58 2018 -0700 Committer: Sarath Subramanian <[email protected]> Committed: Wed Oct 24 11:10:58 2018 -0700 ---------------------------------------------------------------------- .../atlas/discovery/EntityLineageService.java | 124 +++++++++++-------- .../atlas/util/AtlasGremlin2QueryProvider.java | 10 +- 2 files changed, 78 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/2a547434/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index c928d35..472d692 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -26,8 +26,10 @@ import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.lineage.AtlasLineageInfo; import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection; import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation; +import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1; import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; @@ -36,6 +38,8 @@ import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Service; import javax.inject.Inject; +import javax.script.ScriptEngine; +import javax.script.ScriptException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -45,6 +49,7 @@ import java.util.Set; import static org.apache.atlas.AtlasClient.DATA_SET_SUPER_TYPE; import static org.apache.atlas.AtlasClient.PROCESS_SUPER_TYPE; +import static org.apache.atlas.AtlasErrorCode.INSTANCE_LINEAGE_QUERY_FAILED; import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.BOTH; import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.INPUT; import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.OUTPUT; @@ -114,39 +119,14 @@ public class EntityLineageService implements AtlasLineageService { private AtlasLineageInfo getLineageInfo(String guid, LineageDirection direction, int depth, boolean isDataSet) throws AtlasBaseException { Map<String, AtlasEntityHeader> entities = new HashMap<>(); Set<LineageRelation> relations = new HashSet<>(); - String lineageQuery = getLineageQuery(guid, direction, depth, isDataSet); - - List paths = (List) graph.executeGremlinScript(lineageQuery, true); - - if (CollectionUtils.isNotEmpty(paths)) { - for (Object path : paths) { - if (path instanceof List) { - List vertices = (List) path; - - if (CollectionUtils.isNotEmpty(vertices)) { - AtlasEntityHeader prev = null; - - for (Object vertex : vertices) { - if (!(vertex instanceof AtlasVertex)) { - continue; - } - - AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader((AtlasVertex)vertex); - - if (!entities.containsKey(entity.getGuid())) { - entities.put(entity.getGuid(), entity); - } - - if (prev != null) { - if (direction.equals(INPUT)) { - relations.add(new LineageRelation(entity.getGuid(), prev.getGuid())); - } else if (direction.equals(OUTPUT)) { - relations.add(new LineageRelation(prev.getGuid(), entity.getGuid())); - } - } - prev = entity; - } - } + final Map<String, Object> bindings = new HashMap<>(); + String lineageQuery = getLineageQuery(guid, direction, depth, isDataSet, bindings); + List edges = executeGremlinScript(lineageQuery, bindings); + + if (CollectionUtils.isNotEmpty(edges)) { + for (Object edge : edges) { + if (edge instanceof AtlasEdge) { + processEdge((AtlasEdge) edge, entities, relations); } } } @@ -154,6 +134,45 @@ public class EntityLineageService implements AtlasLineageService { return new AtlasLineageInfo(guid, entities, relations, direction, depth); } + private List executeGremlinScript(String lineageQuery, Map<String, Object> bindings) throws AtlasBaseException { + List ret; + ScriptEngine engine = graph.getGremlinScriptEngine(); + + try { + ret = (List) graph.executeGremlinScript(engine, bindings, lineageQuery, false); + } catch (ScriptException e) { + throw new AtlasBaseException(INSTANCE_LINEAGE_QUERY_FAILED, lineageQuery); + } finally { + graph.releaseGremlinScriptEngine(engine); + } + + return ret; + } + + private void processEdge(final AtlasEdge edge, final Map<String, AtlasEntityHeader> entities, final Set<LineageRelation> relations) throws AtlasBaseException { + AtlasVertex inVertex = edge.getInVertex(); + AtlasVertex outVertex = edge.getOutVertex(); + String inGuid = AtlasGraphUtilsV1.getIdFromVertex(inVertex); + String outGuid = AtlasGraphUtilsV1.getIdFromVertex(outVertex); + boolean isInputEdge = edge.getLabel().equalsIgnoreCase(INPUT_PROCESS_EDGE); + + if (!entities.containsKey(inGuid)) { + AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeader(inVertex); + entities.put(inGuid, entityHeader); + } + + if (!entities.containsKey(outGuid)) { + AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeader(outVertex); + entities.put(outGuid, entityHeader); + } + + if (isInputEdge) { + relations.add(new LineageRelation(inGuid, outGuid)); + } else { + relations.add(new LineageRelation(outGuid, inGuid)); + } + } + private AtlasLineageInfo getBothLineageInfo(String guid, int depth, boolean isDataSet) throws AtlasBaseException { AtlasLineageInfo inputLineage = getLineageInfo(guid, INPUT, depth, isDataSet); AtlasLineageInfo outputLineage = getLineageInfo(guid, OUTPUT, depth, isDataSet); @@ -166,30 +185,33 @@ public class EntityLineageService implements AtlasLineageService { return ret; } - private String getLineageQuery(String entityGuid, LineageDirection direction, int depth, boolean isDataSet) throws AtlasBaseException { - String ret = null; + private String getLineageQuery(String entityGuid, LineageDirection direction, int depth, boolean isDataSet, Map<String, Object> bindings) { + String ret; - if (direction.equals(INPUT)) { - ret = generateLineageQuery(entityGuid, depth, isDataSet, OUTPUT_PROCESS_EDGE, INPUT_PROCESS_EDGE); + if (depth < 1) { + ret = isDataSet ? gremlinQueryProvider.getQuery(FULL_LINEAGE_DATASET) : gremlinQueryProvider.getQuery(FULL_LINEAGE_PROCESS); + } else { + ret = isDataSet ? gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_DATASET) : gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_PROCESS); + } + + String incoming = null; + String outgoing = null; + if (direction.equals(INPUT)) { + incoming = OUTPUT_PROCESS_EDGE; + outgoing = INPUT_PROCESS_EDGE; } else if (direction.equals(OUTPUT)) { - ret = generateLineageQuery(entityGuid, depth, isDataSet, INPUT_PROCESS_EDGE, OUTPUT_PROCESS_EDGE); + incoming = INPUT_PROCESS_EDGE; + outgoing = OUTPUT_PROCESS_EDGE; } - return ret; - } + bindings.put("guid", entityGuid); + bindings.put("incomingEdgeLabel", incoming); + bindings.put("outgoingEdgeLabel", outgoing); + bindings.put("processDepth", depth); + bindings.put("dataSetDepth", depth * 2); - private String generateLineageQuery(String entityGuid, int depth, boolean isDataSet, String incomingFrom, String outgoingTo) { - String lineageQuery; - - if (depth < 1) { - String query = isDataSet ? gremlinQueryProvider.getQuery(FULL_LINEAGE_DATASET) : gremlinQueryProvider.getQuery(FULL_LINEAGE_PROCESS); - lineageQuery = String.format(query, entityGuid, incomingFrom, outgoingTo); - } else { - String query = isDataSet ? gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_DATASET) : gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_PROCESS); - lineageQuery = String.format(query, entityGuid, incomingFrom, outgoingTo, depth); - } - return lineageQuery; + return ret; } private List<String> getTypeAndAllSuperTypes(AtlasEntityType entityType) { http://git-wip-us.apache.org/repos/asf/atlas/blob/2a547434/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java index 2278aef..c98c34d 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java @@ -56,13 +56,13 @@ public class AtlasGremlin2QueryProvider extends AtlasGremlinQueryProvider { case EXPORT_TYPE_DEFAULT: return "g.V().has('__typeName',typeName).has(attrName, attrValue).has('__guid').__guid.toList()"; case FULL_LINEAGE_DATASET: - return "g.V('__guid', '%s').as('src').in('%s').out('%s').loop('src', {((it.path.contains(it.object)) ? false : true)}, {true}).path().toList()"; - case FULL_LINEAGE_PROCESS: - return "g.V('__guid', '%s').as('src').out('%s').in('%s').loop('src', {((it.path.contains(it.object)) ? false : true)}, {true}).path().toList()"; + return "e=[]; g.V('__guid', guid).as('src').inE(incomingEdgeLabel).aggregate(e).outV().outE(outgoingEdgeLabel).aggregate(e).inV().loop('src', {true}, {true}).toList(); e;"; case PARTIAL_LINEAGE_DATASET: - return "g.V('__guid', '%s').as('src').in('%s').out('%s').loop('src', {it.loops <= %s}, {true}).path().toList()"; + return "e=[]; g.V('__guid', guid).as('src').inE(incomingEdgeLabel).aggregate(e).outV().outE(outgoingEdgeLabel).aggregate(e).inV().loop('src', {it.loops <= dataSetDepth}, {true}).toList(); e;"; + case FULL_LINEAGE_PROCESS: + return "e=[]; g.V('__guid', guid).outE(outgoingEdgeLabel).aggregate(e).inV().as('src').inE(incomingEdgeLabel).aggregate(e).outV().outE(outgoingEdgeLabel).aggregate(e).inV().loop('src', {true}, {true}).toList(); e;"; case PARTIAL_LINEAGE_PROCESS: - return "g.V('__guid', '%s').as('src').out('%s').in('%s').loop('src', {it.loops <= %s}, {true}).path().toList()"; + return "e=[]; g.V('__guid', guid).outE(outgoingEdgeLabel).aggregate(e).inV().as('src').inE(incomingEdgeLabel).aggregate(e).outV().outE(outgoingEdgeLabel).aggregate(e).inV().loop('src', {it.loops <= processDepth}, {true}).toList(); e;"; case BASIC_SEARCH_TYPE_FILTER: return ".has('__typeName', T.in, typeNames)"; case BASIC_SEARCH_CLASSIFICATION_FILTER:
