Repository: atlas
Updated Branches:
  refs/heads/branch-0.8 707bcb1cb -> 2a547434b


ATLAS-2927: Update lineage query for Process entities


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/2a547434
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/2a547434
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/2a547434

Branch: refs/heads/branch-0.8
Commit: 2a547434b5dd5a8a78ed915f17b05b674aa9df95
Parents: 707bcb1
Author: Sarath Subramanian <[email protected]>
Authored: Wed Oct 24 11:10:58 2018 -0700
Committer: Sarath Subramanian <[email protected]>
Committed: Wed Oct 24 11:10:58 2018 -0700

----------------------------------------------------------------------
 .../atlas/discovery/EntityLineageService.java   | 124 +++++++++++--------
 .../atlas/util/AtlasGremlin2QueryProvider.java  |  10 +-
 2 files changed, 78 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/2a547434/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java 
b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
index c928d35..472d692 100644
--- 
a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
+++ 
b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
@@ -26,8 +26,10 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.lineage.AtlasLineageInfo;
 import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
 import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
 import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
@@ -36,6 +38,8 @@ import org.apache.commons.collections.CollectionUtils;
 import org.springframework.stereotype.Service;
 
 import javax.inject.Inject;
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -45,6 +49,7 @@ import java.util.Set;
 
 import static org.apache.atlas.AtlasClient.DATA_SET_SUPER_TYPE;
 import static org.apache.atlas.AtlasClient.PROCESS_SUPER_TYPE;
+import static org.apache.atlas.AtlasErrorCode.INSTANCE_LINEAGE_QUERY_FAILED;
 import static 
org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.BOTH;
 import static 
org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.INPUT;
 import static 
org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.OUTPUT;
@@ -114,39 +119,14 @@ public class EntityLineageService implements 
AtlasLineageService {
     private AtlasLineageInfo getLineageInfo(String guid, LineageDirection 
direction, int depth, boolean isDataSet) throws AtlasBaseException {
         Map<String, AtlasEntityHeader> entities     = new HashMap<>();
         Set<LineageRelation>           relations    = new HashSet<>();
-        String                         lineageQuery = getLineageQuery(guid, 
direction, depth, isDataSet);
-
-        List paths = (List) graph.executeGremlinScript(lineageQuery, true);
-
-        if (CollectionUtils.isNotEmpty(paths)) {
-            for (Object path : paths) {
-                if (path instanceof List) {
-                    List vertices = (List) path;
-
-                    if (CollectionUtils.isNotEmpty(vertices)) {
-                        AtlasEntityHeader prev = null;
-
-                        for (Object vertex : vertices) {
-                            if (!(vertex instanceof AtlasVertex)) {
-                                continue;
-                            }
-
-                            AtlasEntityHeader entity = 
entityRetriever.toAtlasEntityHeader((AtlasVertex)vertex);
-
-                            if (!entities.containsKey(entity.getGuid())) {
-                                entities.put(entity.getGuid(), entity);
-                            }
-
-                            if (prev != null) {
-                                if (direction.equals(INPUT)) {
-                                    relations.add(new 
LineageRelation(entity.getGuid(), prev.getGuid()));
-                                } else if (direction.equals(OUTPUT)) {
-                                    relations.add(new 
LineageRelation(prev.getGuid(), entity.getGuid()));
-                                }
-                            }
-                            prev = entity;
-                        }
-                    }
+        final Map<String, Object>      bindings     = new HashMap<>();
+        String                         lineageQuery = getLineageQuery(guid, 
direction, depth, isDataSet, bindings);
+        List                           edges        = 
executeGremlinScript(lineageQuery, bindings);
+
+        if (CollectionUtils.isNotEmpty(edges)) {
+            for (Object edge : edges) {
+                if (edge instanceof AtlasEdge) {
+                    processEdge((AtlasEdge) edge, entities, relations);
                 }
             }
         }
@@ -154,6 +134,45 @@ public class EntityLineageService implements 
AtlasLineageService {
         return new AtlasLineageInfo(guid, entities, relations, direction, 
depth);
     }
 
+    private List executeGremlinScript(String lineageQuery, Map<String, Object> 
bindings) throws AtlasBaseException {
+        List         ret;
+        ScriptEngine engine = graph.getGremlinScriptEngine();
+
+        try {
+            ret = (List) graph.executeGremlinScript(engine, bindings, 
lineageQuery, false);
+        } catch (ScriptException e) {
+            throw new AtlasBaseException(INSTANCE_LINEAGE_QUERY_FAILED, 
lineageQuery);
+        } finally {
+            graph.releaseGremlinScriptEngine(engine);
+        }
+
+        return ret;
+    }
+
+    private void processEdge(final AtlasEdge edge, final Map<String, 
AtlasEntityHeader> entities, final Set<LineageRelation> relations) throws 
AtlasBaseException {
+        AtlasVertex inVertex    = edge.getInVertex();
+        AtlasVertex outVertex   = edge.getOutVertex();
+        String      inGuid      = AtlasGraphUtilsV1.getIdFromVertex(inVertex);
+        String      outGuid     = AtlasGraphUtilsV1.getIdFromVertex(outVertex);
+        boolean     isInputEdge = 
edge.getLabel().equalsIgnoreCase(INPUT_PROCESS_EDGE);
+
+        if (!entities.containsKey(inGuid)) {
+            AtlasEntityHeader entityHeader = 
entityRetriever.toAtlasEntityHeader(inVertex);
+            entities.put(inGuid, entityHeader);
+        }
+
+        if (!entities.containsKey(outGuid)) {
+            AtlasEntityHeader entityHeader = 
entityRetriever.toAtlasEntityHeader(outVertex);
+            entities.put(outGuid, entityHeader);
+        }
+
+        if (isInputEdge) {
+            relations.add(new LineageRelation(inGuid, outGuid));
+        } else {
+            relations.add(new LineageRelation(outGuid, inGuid));
+        }
+    }
+
     private AtlasLineageInfo getBothLineageInfo(String guid, int depth, 
boolean isDataSet) throws AtlasBaseException {
         AtlasLineageInfo inputLineage  = getLineageInfo(guid, INPUT, depth, 
isDataSet);
         AtlasLineageInfo outputLineage = getLineageInfo(guid, OUTPUT, depth, 
isDataSet);
@@ -166,30 +185,33 @@ public class EntityLineageService implements 
AtlasLineageService {
         return ret;
     }
 
-    private String getLineageQuery(String entityGuid, LineageDirection 
direction, int depth, boolean isDataSet) throws AtlasBaseException {
-        String ret = null;
+    private String getLineageQuery(String entityGuid, LineageDirection 
direction, int depth, boolean isDataSet, Map<String, Object> bindings) {
+        String ret;
 
-        if (direction.equals(INPUT)) {
-            ret = generateLineageQuery(entityGuid, depth, isDataSet, 
OUTPUT_PROCESS_EDGE, INPUT_PROCESS_EDGE);
+        if (depth < 1) {
+            ret = isDataSet ? 
gremlinQueryProvider.getQuery(FULL_LINEAGE_DATASET) : 
gremlinQueryProvider.getQuery(FULL_LINEAGE_PROCESS);
+        } else {
+            ret = isDataSet ? 
gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_DATASET) : 
gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_PROCESS);
+        }
+
+        String incoming = null;
+        String outgoing = null;
 
+        if (direction.equals(INPUT)) {
+            incoming = OUTPUT_PROCESS_EDGE;
+            outgoing = INPUT_PROCESS_EDGE;
         } else if (direction.equals(OUTPUT)) {
-            ret = generateLineageQuery(entityGuid, depth, isDataSet, 
INPUT_PROCESS_EDGE, OUTPUT_PROCESS_EDGE);
+            incoming = INPUT_PROCESS_EDGE;
+            outgoing = OUTPUT_PROCESS_EDGE;
         }
 
-        return ret;
-    }
+        bindings.put("guid", entityGuid);
+        bindings.put("incomingEdgeLabel", incoming);
+        bindings.put("outgoingEdgeLabel", outgoing);
+        bindings.put("processDepth", depth);
+        bindings.put("dataSetDepth", depth * 2);
 
-    private String generateLineageQuery(String entityGuid, int depth, boolean 
isDataSet, String incomingFrom, String outgoingTo) {
-        String lineageQuery;
-
-        if (depth < 1) {
-            String query = isDataSet ? 
gremlinQueryProvider.getQuery(FULL_LINEAGE_DATASET) : 
gremlinQueryProvider.getQuery(FULL_LINEAGE_PROCESS);
-            lineageQuery = String.format(query, entityGuid, incomingFrom, 
outgoingTo);
-        } else {
-            String query = isDataSet ? 
gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_DATASET) : 
gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_PROCESS);
-            lineageQuery = String.format(query, entityGuid, incomingFrom, 
outgoingTo, depth);
-        }
-        return lineageQuery;
+        return ret;
     }
 
     private List<String> getTypeAndAllSuperTypes(AtlasEntityType entityType) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/2a547434/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
 
b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
index 2278aef..c98c34d 100644
--- 
a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
+++ 
b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
@@ -56,13 +56,13 @@ public class AtlasGremlin2QueryProvider extends 
AtlasGremlinQueryProvider {
             case EXPORT_TYPE_DEFAULT:
                 return "g.V().has('__typeName',typeName).has(attrName, 
attrValue).has('__guid').__guid.toList()";
             case FULL_LINEAGE_DATASET:
-                return "g.V('__guid', 
'%s').as('src').in('%s').out('%s').loop('src', {((it.path.contains(it.object)) 
? false : true)}, {true}).path().toList()";
-            case FULL_LINEAGE_PROCESS:
-                return "g.V('__guid', 
'%s').as('src').out('%s').in('%s').loop('src', {((it.path.contains(it.object)) 
? false : true)}, {true}).path().toList()";
+                return "e=[]; g.V('__guid', 
guid).as('src').inE(incomingEdgeLabel).aggregate(e).outV().outE(outgoingEdgeLabel).aggregate(e).inV().loop('src',
 {true}, {true}).toList(); e;";
             case PARTIAL_LINEAGE_DATASET:
-                return "g.V('__guid', 
'%s').as('src').in('%s').out('%s').loop('src', {it.loops <= %s}, 
{true}).path().toList()";
+                return "e=[]; g.V('__guid', 
guid).as('src').inE(incomingEdgeLabel).aggregate(e).outV().outE(outgoingEdgeLabel).aggregate(e).inV().loop('src',
 {it.loops <= dataSetDepth}, {true}).toList(); e;";
+            case FULL_LINEAGE_PROCESS:
+                return "e=[]; g.V('__guid', 
guid).outE(outgoingEdgeLabel).aggregate(e).inV().as('src').inE(incomingEdgeLabel).aggregate(e).outV().outE(outgoingEdgeLabel).aggregate(e).inV().loop('src',
 {true}, {true}).toList(); e;";
             case PARTIAL_LINEAGE_PROCESS:
-                return "g.V('__guid', 
'%s').as('src').out('%s').in('%s').loop('src', {it.loops <= %s}, 
{true}).path().toList()";
+                return "e=[]; g.V('__guid', 
guid).outE(outgoingEdgeLabel).aggregate(e).inV().as('src').inE(incomingEdgeLabel).aggregate(e).outV().outE(outgoingEdgeLabel).aggregate(e).inV().loop('src',
 {it.loops <= processDepth}, {true}).toList(); e;";
             case BASIC_SEARCH_TYPE_FILTER:
                 return ".has('__typeName', T.in, typeNames)";
             case BASIC_SEARCH_CLASSIFICATION_FILTER:

Reply via email to