Repository: atlas Updated Branches: refs/heads/master 3b8a34c51 -> 46b9b7c85
ATLAS-2927: Update lineage query for Process entities Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/46b9b7c8 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/46b9b7c8 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/46b9b7c8 Branch: refs/heads/master Commit: 46b9b7c85835b1c4285eddce6c9773024a1b2114 Parents: 3b8a34c Author: Sarath Subramanian <[email protected]> Authored: Sun Oct 21 22:22:38 2018 -0700 Committer: Sarath Subramanian <[email protected]> Committed: Sun Oct 21 22:22:38 2018 -0700 ---------------------------------------------------------------------- .../atlas/discovery/EntityLineageService.java | 68 +++++++++++++------- .../atlas/util/AtlasGremlin3QueryProvider.java | 8 +-- 2 files changed, 49 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/46b9b7c8/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 6f2f97b..89c969b 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -41,7 +41,6 @@ import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.util.AtlasGremlinQueryProvider; -import org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery; import org.apache.atlas.v1.model.lineage.SchemaResponse.SchemaDetails; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; @@ -51,6 +50,8 @@ import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import javax.inject.Inject; +import javax.script.ScriptEngine; +import javax.script.ScriptException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -61,6 +62,7 @@ import java.util.stream.Collectors; import static org.apache.atlas.AtlasClient.DATA_SET_SUPER_TYPE; import static org.apache.atlas.AtlasClient.PROCESS_SUPER_TYPE; +import static org.apache.atlas.AtlasErrorCode.INSTANCE_LINEAGE_QUERY_FAILED; import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.BOTH; import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.INPUT; import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.OUTPUT; @@ -203,16 +205,16 @@ public class EntityLineageService implements AtlasLineageService { } private AtlasLineageInfo getLineageInfo(String guid, LineageDirection direction, int depth, boolean isDataSet) throws AtlasBaseException { + final Map<String, Object> bindings = new HashMap<>(); + String lineageQuery = getLineageQuery(guid, direction, depth, isDataSet, bindings); + List results = executeGremlinScript(bindings, lineageQuery); Map<String, AtlasEntityHeader> entities = new HashMap<>(); Set<LineageRelation> relations = new HashSet<>(); - String lineageQuery = getLineageQuery(guid, direction, depth, isDataSet); - List edgeMapList = (List) graph.executeGremlinScript(lineageQuery, false); - - if (CollectionUtils.isNotEmpty(edgeMapList)) { - for (Object edgeMap : edgeMapList) { - if (edgeMap instanceof Map) { - for (final Object o : ((Map) edgeMap).entrySet()) { + if (CollectionUtils.isNotEmpty(results)) { + for (Object result : results) { + if (result instanceof Map) { + for (final Object o : ((Map) result).entrySet()) { final Map.Entry entry = (Map.Entry) o; Object value = entry.getValue(); @@ -230,6 +232,8 @@ public class EntityLineageService implements AtlasLineageService { LOG.warn("Invalid value of type {} found, ignoring", (value != null ? value.getClass().getSimpleName() : "null")); } } + } else if (result instanceof AtlasEdge) { + processEdge((AtlasEdge) result, entities, relations); } } } @@ -237,6 +241,21 @@ public class EntityLineageService implements AtlasLineageService { return new AtlasLineageInfo(guid, entities, relations, direction, depth); } + private List executeGremlinScript(Map<String, Object> bindings, String lineageQuery) throws AtlasBaseException { + List ret; + ScriptEngine engine = graph.getGremlinScriptEngine(); + + try { + ret = (List) graph.executeGremlinScript(engine, bindings, lineageQuery, false); + } catch (ScriptException e) { + throw new AtlasBaseException(INSTANCE_LINEAGE_QUERY_FAILED, lineageQuery); + } finally { + graph.releaseGremlinScriptEngine(engine); + } + + return ret; + } + private void processEdge(final AtlasEdge edge, final Map<String, AtlasEntityHeader> entities, final Set<LineageRelation> relations) throws AtlasBaseException { AtlasVertex inVertex = edge.getInVertex(); AtlasVertex outVertex = edge.getOutVertex(); @@ -274,29 +293,32 @@ public class EntityLineageService implements AtlasLineageService { return ret; } - private String getLineageQuery(String entityGuid, LineageDirection direction, int depth, boolean isDataSet) { - String ret = null; + private String getLineageQuery(String entityGuid, LineageDirection direction, int depth, boolean isDataSet, Map<String, Object> bindings) { + String incomingFrom = null; + String outgoingTo = null; + String ret; if (direction.equals(INPUT)) { - ret = generateLineageQuery(entityGuid, depth, isDataSet, PROCESS_OUTPUTS_EDGE, PROCESS_INPUTS_EDGE); - + incomingFrom = PROCESS_OUTPUTS_EDGE; + outgoingTo = PROCESS_INPUTS_EDGE; } else if (direction.equals(OUTPUT)) { - ret = generateLineageQuery(entityGuid, depth, isDataSet, PROCESS_INPUTS_EDGE, PROCESS_OUTPUTS_EDGE); + incomingFrom = PROCESS_INPUTS_EDGE; + outgoingTo = PROCESS_OUTPUTS_EDGE; } - return ret; - } - - private String generateLineageQuery(String entityGuid, int depth, boolean isDataSet, String incomingFrom, String outgoingTo) { - String lineageQuery; + bindings.put("guid", entityGuid); + bindings.put("incomingEdgeLabel", incomingFrom); + bindings.put("outgoingEdgeLabel", outgoingTo); + bindings.put("depth", depth); if (depth < 1) { - String query = isDataSet ? gremlinQueryProvider.getQuery(FULL_LINEAGE_DATASET) : gremlinQueryProvider.getQuery(FULL_LINEAGE_PROCESS); - lineageQuery = String.format(query, entityGuid, incomingFrom, outgoingTo); + ret = isDataSet ? gremlinQueryProvider.getQuery(FULL_LINEAGE_DATASET) : + gremlinQueryProvider.getQuery(FULL_LINEAGE_PROCESS); } else { - String query = isDataSet ? gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_DATASET) : gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_PROCESS); - lineageQuery = String.format(query, entityGuid, incomingFrom, outgoingTo, depth); + ret = isDataSet ? gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_DATASET) : + gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_PROCESS); } - return lineageQuery; + + return ret; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/46b9b7c8/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java index 866e5af..6d3b1a8 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java @@ -46,13 +46,13 @@ public class AtlasGremlin3QueryProvider extends AtlasGremlin2QueryProvider { case EXPORT_TYPE_ALL_FOR_TYPE: return "g.V().has('__typeName', within(typeName)).has('__guid').values('__guid').toList()"; case FULL_LINEAGE_DATASET: - return "g.V().has('__guid', '%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).emit().select('e1', 'e2').toList()"; + return "g.V().has('__guid', guid).repeat(__.inE(incomingEdgeLabel).as('e1').outV().outE(outgoingEdgeLabel).as('e2').inV()).emit().select('e1', 'e2').toList()"; case PARTIAL_LINEAGE_DATASET: - return "g.V().has('__guid', '%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).times(%s).emit().select('e1', 'e2').toList()"; + return "g.V().has('__guid', guid).repeat(__.inE(incomingEdgeLabel).as('e1').outV().outE(outgoingEdgeLabel).as('e2').inV()).times(depth).emit().select('e1', 'e2').toList()"; case FULL_LINEAGE_PROCESS: - return "g.V().has('__guid', '%s').repeat(__.outE('%s').as('e1').inV().inE('%s').as('e2').outV()).emit().select('e1', 'e2').toList()"; + return "g.V().has('__guid', guid).outE(outgoingEdgeLabel).store('e').inV().repeat(__.inE(incomingEdgeLabel).store('e').outV().outE(outgoingEdgeLabel).store('e').inV()).cap('e').unfold().toList()"; case PARTIAL_LINEAGE_PROCESS: - return "g.V().has('__guid', '%s').repeat(__.outE('%s').as('e1').inV().inE('%s').as('e2').outV()).times(%s).emit().select('e1', 'e2').toList()"; + return "g.V().has('__guid', guid).outE(outgoingEdgeLabel).store('e').inV().repeat(__.inE(incomingEdgeLabel).store('e').outV().outE(outgoingEdgeLabel).store('e').inV()).times(depth).cap('e').unfold().toList()"; case TO_RANGE_LIST: return ".range(startIdx, endIdx).toList()"; case RELATIONSHIP_SEARCH:
