Repository: atlas
Updated Branches:
  refs/heads/branch-0.8 6bc4039aa -> d6bd99065


ATLAS-2905: Generate lineage information for process entities


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/d6bd9906
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/d6bd9906
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/d6bd9906

Branch: refs/heads/branch-0.8
Commit: d6bd9906546ca5adf069ae33f39b8b4afd8ddc84
Parents: 6bc4039
Author: Sarath Subramanian <[email protected]>
Authored: Tue Oct 2 15:37:45 2018 -0700
Committer: Sarath Subramanian <[email protected]>
Committed: Tue Oct 2 21:42:35 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/atlas/AtlasErrorCode.java   |   3 +-
 .../atlas/discovery/EntityLineageService.java   | 110 +++++++++++--------
 .../atlas/util/AtlasGremlin2QueryProvider.java  |  20 ++--
 .../atlas/util/AtlasGremlinQueryProvider.java   |   6 +-
 4 files changed, 81 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/d6bd9906/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java 
b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index 8163390..ac82f25 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -92,8 +92,9 @@ public enum AtlasErrorCode {
     INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND(404, "ATLAS-404-00-009", "Instance 
{0} with unique attribute {1} does not exist"),
     REFERENCED_ENTITY_NOT_FOUND(404, "ATLAS-404-00-00A", "Referenced entity 
{0} is not found"),
     INSTANCE_NOT_FOUND(404, "ATLAS-404-00-00B", "Given instance is invalid/not 
found: {0}"),
+    INVALID_LINEAGE_ENTITY_TYPE(404, "ATLAS-404-00-00C", "Given instance guid 
{0} with type {1} is not a valid lineage entity type."),
 
-     // All data conflict errors go here
+    // All data conflict errors go here
     TYPE_ALREADY_EXISTS(409, "ATLAS-409-00-001", "Given type {0} already 
exists"),
     TYPE_HAS_REFERENCES(409, "ATLAS-409-00-002", "Given type {0} has 
references"),
     INSTANCE_ALREADY_EXISTS(409, "ATLAS-409-00-003", "failed to update entity: 
{0}"),

http://git-wip-us.apache.org/repos/asf/atlas/blob/d6bd9906/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java 
b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
index 3ae41c8..c928d35 100644
--- 
a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
+++ 
b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
@@ -19,7 +19,6 @@
 package org.apache.atlas.discovery;
 
 
-import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.exception.AtlasBaseException;
@@ -27,25 +26,33 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.lineage.AtlasLineageInfo;
 import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
 import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation;
-import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.util.AtlasGremlinQueryProvider;
-import org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery;
 import org.apache.commons.collections.CollectionUtils;
 import org.springframework.stereotype.Service;
 
 import javax.inject.Inject;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.atlas.AtlasClient.DATA_SET_SUPER_TYPE;
+import static org.apache.atlas.AtlasClient.PROCESS_SUPER_TYPE;
+import static 
org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.BOTH;
+import static 
org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.INPUT;
+import static 
org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.OUTPUT;
+import static 
org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.FULL_LINEAGE_DATASET;
+import static 
org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.FULL_LINEAGE_PROCESS;
+import static 
org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.PARTIAL_LINEAGE_DATASET;
+import static 
org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.PARTIAL_LINEAGE_PROCESS;
+
 @Service
 public class EntityLineageService implements AtlasLineageService {
     private static final String INPUT_PROCESS_EDGE      =  "__Process.inputs";
@@ -54,30 +61,46 @@ public class EntityLineageService implements 
AtlasLineageService {
     private final AtlasGraph                graph;
     private final AtlasGremlinQueryProvider gremlinQueryProvider;
     private final EntityGraphRetriever      entityRetriever;
+    private final AtlasTypeRegistry         atlasTypeRegistry;
 
     @Inject
-    EntityLineageService(AtlasTypeRegistry typeRegistry, AtlasGraph 
atlasGraph) throws DiscoveryException {
+    EntityLineageService(AtlasTypeRegistry typeRegistry, AtlasGraph 
atlasGraph) {
         this.graph                = atlasGraph;
         this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
         this.entityRetriever      = new EntityGraphRetriever(typeRegistry);
+        this.atlasTypeRegistry    = typeRegistry;
     }
 
     @Override
     @GraphTransaction
     public AtlasLineageInfo getAtlasLineageInfo(String guid, LineageDirection 
direction, int depth) throws AtlasBaseException {
-        AtlasLineageInfo lineageInfo;
+        AtlasLineageInfo  ret;
+        AtlasEntityHeader entity     = 
entityRetriever.toAtlasEntityHeader(guid);
+        AtlasEntityType   entityType = 
atlasTypeRegistry.getEntityTypeByName(entity.getTypeName());
+
+        if (entityType == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, 
entity.getTypeName());
+        }
+
+        List<String> typeAndAllSuperTypes = 
getTypeAndAllSuperTypes(entityType);
 
-        if (!entityExists(guid)) {
-            throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
+        boolean isDataSet = typeAndAllSuperTypes.contains(DATA_SET_SUPER_TYPE);
+
+        if (!isDataSet) {
+            boolean isProcess = 
typeAndAllSuperTypes.contains(PROCESS_SUPER_TYPE);
+
+            if (!isProcess) {
+                throw new 
AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, 
entity.getTypeName());
+            }
         }
 
         if (direction != null) {
-            if (direction.equals(LineageDirection.INPUT)) {
-                lineageInfo = getLineageInfo(guid, LineageDirection.INPUT, 
depth);
-            } else if (direction.equals(LineageDirection.OUTPUT)) {
-                lineageInfo = getLineageInfo(guid, LineageDirection.OUTPUT, 
depth);
-            } else if (direction.equals(LineageDirection.BOTH)) {
-                lineageInfo = getBothLineageInfo(guid, depth);
+            if (direction.equals(INPUT)) {
+                ret = getLineageInfo(guid, INPUT, depth, isDataSet);
+            } else if (direction.equals(OUTPUT)) {
+                ret = getLineageInfo(guid, OUTPUT, depth, isDataSet);
+            } else if (direction.equals(BOTH)) {
+                ret = getBothLineageInfo(guid, depth, isDataSet);
             } else {
                 throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", 
direction.toString());
             }
@@ -85,13 +108,13 @@ public class EntityLineageService implements 
AtlasLineageService {
             throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", 
null);
         }
 
-        return lineageInfo;
+        return ret;
     }
 
-    private AtlasLineageInfo getLineageInfo(String guid, LineageDirection 
direction, int depth) throws AtlasBaseException {
+    private AtlasLineageInfo getLineageInfo(String guid, LineageDirection 
direction, int depth, boolean isDataSet) throws AtlasBaseException {
         Map<String, AtlasEntityHeader> entities     = new HashMap<>();
         Set<LineageRelation>           relations    = new HashSet<>();
-        String                         lineageQuery = getLineageQuery(guid, 
direction, depth);
+        String                         lineageQuery = getLineageQuery(guid, 
direction, depth, isDataSet);
 
         List paths = (List) graph.executeGremlinScript(lineageQuery, true);
 
@@ -115,9 +138,9 @@ public class EntityLineageService implements 
AtlasLineageService {
                             }
 
                             if (prev != null) {
-                                if (direction.equals(LineageDirection.INPUT)) {
+                                if (direction.equals(INPUT)) {
                                     relations.add(new 
LineageRelation(entity.getGuid(), prev.getGuid()));
-                                } else if 
(direction.equals(LineageDirection.OUTPUT)) {
+                                } else if (direction.equals(OUTPUT)) {
                                     relations.add(new 
LineageRelation(prev.getGuid(), entity.getGuid()));
                                 }
                             }
@@ -131,54 +154,55 @@ public class EntityLineageService implements 
AtlasLineageService {
         return new AtlasLineageInfo(guid, entities, relations, direction, 
depth);
     }
 
-    private AtlasLineageInfo getBothLineageInfo(String guid, int depth) throws 
AtlasBaseException {
-        AtlasLineageInfo inputLineage  = getLineageInfo(guid, 
LineageDirection.INPUT, depth);
-        AtlasLineageInfo outputLineage = getLineageInfo(guid, 
LineageDirection.OUTPUT, depth);
+    private AtlasLineageInfo getBothLineageInfo(String guid, int depth, 
boolean isDataSet) throws AtlasBaseException {
+        AtlasLineageInfo inputLineage  = getLineageInfo(guid, INPUT, depth, 
isDataSet);
+        AtlasLineageInfo outputLineage = getLineageInfo(guid, OUTPUT, depth, 
isDataSet);
         AtlasLineageInfo ret           = inputLineage;
 
         ret.getRelations().addAll(outputLineage.getRelations());
         ret.getGuidEntityMap().putAll(outputLineage.getGuidEntityMap());
-        ret.setLineageDirection(LineageDirection.BOTH);
+        ret.setLineageDirection(BOTH);
 
         return ret;
     }
 
-    private String getLineageQuery(String entityGuid, LineageDirection 
direction, int depth) throws AtlasBaseException {
-        String lineageQuery = null;
+    private String getLineageQuery(String entityGuid, LineageDirection 
direction, int depth, boolean isDataSet) throws AtlasBaseException {
+        String ret = null;
 
-        if (direction.equals(LineageDirection.INPUT)) {
-            lineageQuery = generateLineageQuery(entityGuid, depth, 
OUTPUT_PROCESS_EDGE, INPUT_PROCESS_EDGE);
+        if (direction.equals(INPUT)) {
+            ret = generateLineageQuery(entityGuid, depth, isDataSet, 
OUTPUT_PROCESS_EDGE, INPUT_PROCESS_EDGE);
 
-        } else if (direction.equals(LineageDirection.OUTPUT)) {
-            lineageQuery = generateLineageQuery(entityGuid, depth, 
INPUT_PROCESS_EDGE, OUTPUT_PROCESS_EDGE);
+        } else if (direction.equals(OUTPUT)) {
+            ret = generateLineageQuery(entityGuid, depth, isDataSet, 
INPUT_PROCESS_EDGE, OUTPUT_PROCESS_EDGE);
         }
 
-        return lineageQuery;
+        return ret;
     }
 
-    private String generateLineageQuery(String entityGuid, int depth, String 
incomingFrom, String outgoingTo) {
+    private String generateLineageQuery(String entityGuid, int depth, boolean 
isDataSet, String incomingFrom, String outgoingTo) {
         String lineageQuery;
+
         if (depth < 1) {
-            String query = 
gremlinQueryProvider.getQuery(AtlasGremlinQuery.FULL_LINEAGE);
+            String query = isDataSet ? 
gremlinQueryProvider.getQuery(FULL_LINEAGE_DATASET) : 
gremlinQueryProvider.getQuery(FULL_LINEAGE_PROCESS);
             lineageQuery = String.format(query, entityGuid, incomingFrom, 
outgoingTo);
         } else {
-            String query = 
gremlinQueryProvider.getQuery(AtlasGremlinQuery.PARTIAL_LINEAGE);
+            String query = isDataSet ? 
gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_DATASET) : 
gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_PROCESS);
             lineageQuery = String.format(query, entityGuid, incomingFrom, 
outgoingTo, depth);
         }
         return lineageQuery;
     }
 
-    private boolean entityExists(String guid) {
-        boolean               ret     = false;
-        Iterator<AtlasVertex> results = graph.query()
-                                        .has(Constants.GUID_PROPERTY_KEY, guid)
-                                        .vertices().iterator();
+    private List<String> getTypeAndAllSuperTypes(AtlasEntityType entityType) {
+        List<String> ret = null;
+
+        if (entityType != null) {
+            ret = new ArrayList<>();
 
-        while (results.hasNext()) {
-            AtlasVertex  entityVertex = results.next();
-            List<String> superTypes   = 
GraphHelper.getSuperTypeNames(entityVertex);
+            ret.add(entityType.getTypeName());
 
-            ret = (CollectionUtils.isNotEmpty(superTypes)) ? 
superTypes.contains(AtlasClient.DATA_SET_SUPER_TYPE) : false;
+            if (CollectionUtils.isNotEmpty(entityType.getAllSuperTypes())) {
+                ret.addAll(entityType.getAllSuperTypes());
+            }
         }
 
         return ret;

http://git-wip-us.apache.org/repos/asf/atlas/blob/d6bd9906/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
 
b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
index a68e3c3..2278aef 100644
--- 
a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
+++ 
b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
@@ -55,18 +55,14 @@ public class AtlasGremlin2QueryProvider extends 
AtlasGremlinQueryProvider {
                 return 
"g.V().has('__typeName',typeName).filter({it.getProperty(attrName).matches(attrValue)}).has('__guid').__guid.toList()";
             case EXPORT_TYPE_DEFAULT:
                 return "g.V().has('__typeName',typeName).has(attrName, 
attrValue).has('__guid').__guid.toList()";
-            case FULL_LINEAGE:
-                return "g.V('__guid', '%s').as('src').in('%s').out('%s')." +
-                        "loop('src', {((it.path.contains(it.object)) ? false : 
true)}, " +
-                        "{((it.object.'__superTypeNames') ? " +
-                        "(it.object.'__superTypeNames'.contains('DataSet')) : 
false)})." +
-                        "path().toList()";
-            case PARTIAL_LINEAGE:
-                return "g.V('__guid', '%s').as('src').in('%s').out('%s')." +
-                        "loop('src', {it.loops <= %s}, 
{((it.object.'__superTypeNames') ? " +
-                        "(it.object.'__superTypeNames'.contains('DataSet')) : 
false)})." +
-                        "path().toList()";
-
+            case FULL_LINEAGE_DATASET:
+                return "g.V('__guid', 
'%s').as('src').in('%s').out('%s').loop('src', {((it.path.contains(it.object)) 
? false : true)}, {true}).path().toList()";
+            case FULL_LINEAGE_PROCESS:
+                return "g.V('__guid', 
'%s').as('src').out('%s').in('%s').loop('src', {((it.path.contains(it.object)) 
? false : true)}, {true}).path().toList()";
+            case PARTIAL_LINEAGE_DATASET:
+                return "g.V('__guid', 
'%s').as('src').in('%s').out('%s').loop('src', {it.loops <= %s}, 
{true}).path().toList()";
+            case PARTIAL_LINEAGE_PROCESS:
+                return "g.V('__guid', 
'%s').as('src').out('%s').in('%s').loop('src', {it.loops <= %s}, 
{true}).path().toList()";
             case BASIC_SEARCH_TYPE_FILTER:
                 return ".has('__typeName', T.in, typeNames)";
             case BASIC_SEARCH_CLASSIFICATION_FILTER:

http://git-wip-us.apache.org/repos/asf/atlas/blob/d6bd9906/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java 
b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
index 646628f..4c18a5d 100644
--- 
a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
+++ 
b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
@@ -53,8 +53,10 @@ public abstract class AtlasGremlinQueryProvider {
         EXPORT_TYPE_DEFAULT,
 
         // Lineage Queries
-        FULL_LINEAGE,
-        PARTIAL_LINEAGE,
+        FULL_LINEAGE_DATASET,
+        FULL_LINEAGE_PROCESS,
+        PARTIAL_LINEAGE_DATASET,
+        PARTIAL_LINEAGE_PROCESS,
 
         // Discovery Queries
         BASIC_SEARCH_TYPE_FILTER,

Reply via email to