Repository: atlas
Updated Branches:
  refs/heads/master d5f46e3f5 -> 8cc12be18


ATLAS-2905: Generate lineage information for process entities


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/8cc12be1
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/8cc12be1
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/8cc12be1

Branch: refs/heads/master
Commit: 8cc12be185d388798a9de4e1d798e69de9795374
Parents: d5f46e3
Author: Sarath Subramanian <ssubraman...@hortonworks.com>
Authored: Tue Oct 2 15:37:45 2018 -0700
Committer: Sarath Subramanian <ssubraman...@hortonworks.com>
Committed: Tue Oct 2 15:37:45 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/atlas/AtlasErrorCode.java   |  2 +-
 .../atlas/discovery/EntityLineageService.java   | 75 ++++++++++++--------
 .../atlas/util/AtlasGremlin2QueryProvider.java  |  4 +-
 .../atlas/util/AtlasGremlin3QueryProvider.java  |  8 ++-
 .../atlas/util/AtlasGremlinQueryProvider.java   |  6 +-
 5 files changed, 60 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/8cc12be1/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java 
b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index 47726aa..2fe389c 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -173,7 +173,7 @@ public enum AtlasErrorCode {
     RELATIONSHIPDEF_END_TYPE_NAME_NOT_FOUND(404, "ATLAS-404-00-00E", 
"RelationshipDef {0} endDef typename {0} cannot be found"),
     RELATIONSHIP_ALREADY_DELETED(404, "ATLAS-404-00-00F", "Attempting to 
delete a relationship which is already deleted : {0}"),
     INVALID_ENTITY_GUID_FOR_CLASSIFICATION_UPDATE(404, "ATLAS-404-00-010", 
"Updating entityGuid of classification is not allowed."),
-    INSTANCE_GUID_NOT_DATASET(404, "ATLAS-404-00-011", "Given instance guid 
{0} is not a dataset"),
+    INVALID_LINEAGE_ENTITY_TYPE(404, "ATLAS-404-00-011", "Given instance guid 
{0} with type {1} is not a valid lineage entity type."),
     INSTANCE_GUID_DELETED(404, "ATLAS-404-00-012", "Given instance guid {0} 
has been deleted"),
     NO_PROPAGATED_CLASSIFICATIONS_FOUND_FOR_ENTITY(404, "ATLAS-404-00-013", 
"No propagated classifications associated with entity: {0}"),
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/8cc12be1/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java 
b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
index 5d02ea5..6f2f97b 100644
--- 
a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
+++ 
b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
@@ -19,7 +19,6 @@
 package org.apache.atlas.discovery;
 
 
-import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.authorize.AtlasAuthorizationUtils;
@@ -33,7 +32,6 @@ import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.lineage.AtlasLineageInfo;
 import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
 import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation;
-import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
@@ -61,7 +59,16 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.atlas.AtlasClient.DATA_SET_SUPER_TYPE;
+import static org.apache.atlas.AtlasClient.PROCESS_SUPER_TYPE;
+import static 
org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.BOTH;
+import static 
org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.INPUT;
+import static 
org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.OUTPUT;
 import static 
org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
+import static 
org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.FULL_LINEAGE_DATASET;
+import static 
org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.FULL_LINEAGE_PROCESS;
+import static 
org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.PARTIAL_LINEAGE_DATASET;
+import static 
org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.PARTIAL_LINEAGE_PROCESS;
 
 @Service
 public class EntityLineageService implements AtlasLineageService {
@@ -87,7 +94,7 @@ public class EntityLineageService implements 
AtlasLineageService {
     @Override
     @GraphTransaction
     public AtlasLineageInfo getAtlasLineageInfo(String guid, LineageDirection 
direction, int depth) throws AtlasBaseException {
-        AtlasLineageInfo lineageInfo;
+        AtlasLineageInfo ret;
 
         AtlasEntityHeader entity = 
entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
 
@@ -95,17 +102,28 @@ public class EntityLineageService implements 
AtlasLineageService {
 
         AtlasEntityType entityType = 
atlasTypeRegistry.getEntityTypeByName(entity.getTypeName());
 
-        if (entityType == null || 
!entityType.getTypeAndAllSuperTypes().contains(AtlasClient.DATA_SET_SUPER_TYPE))
 {
-            throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_DATASET, guid);
+        if (entityType == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, 
entity.getTypeName());
+        }
+
+        boolean isDataSet = 
entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE);
+
+        if (!isDataSet) {
+            boolean isProcess = 
entityType.getTypeAndAllSuperTypes().contains(PROCESS_SUPER_TYPE);
+
+            if (!isProcess) {
+                throw new 
AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, 
entity.getTypeName());
+            }
+
         }
 
         if (direction != null) {
-            if (direction.equals(LineageDirection.INPUT)) {
-                lineageInfo = getLineageInfo(guid, LineageDirection.INPUT, 
depth);
-            } else if (direction.equals(LineageDirection.OUTPUT)) {
-                lineageInfo = getLineageInfo(guid, LineageDirection.OUTPUT, 
depth);
-            } else if (direction.equals(LineageDirection.BOTH)) {
-                lineageInfo = getBothLineageInfo(guid, depth);
+            if (direction.equals(INPUT)) {
+                ret = getLineageInfo(guid, INPUT, depth, isDataSet);
+            } else if (direction.equals(OUTPUT)) {
+                ret = getLineageInfo(guid, OUTPUT, depth, isDataSet);
+            } else if (direction.equals(BOTH)) {
+                ret = getBothLineageInfo(guid, depth, isDataSet);
             } else {
                 throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", 
direction.toString());
             }
@@ -113,7 +131,7 @@ public class EntityLineageService implements 
AtlasLineageService {
             throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", 
null);
         }
 
-        return lineageInfo;
+        return ret;
     }
 
     @Override
@@ -184,10 +202,10 @@ public class EntityLineageService implements 
AtlasLineageService {
         return columnIds.contains(e.getValue().getGuid());
     }
 
-    private AtlasLineageInfo getLineageInfo(String guid, LineageDirection 
direction, int depth) throws AtlasBaseException {
+    private AtlasLineageInfo getLineageInfo(String guid, LineageDirection 
direction, int depth, boolean isDataSet) throws AtlasBaseException {
         Map<String, AtlasEntityHeader> entities     = new HashMap<>();
         Set<LineageRelation>           relations    = new HashSet<>();
-        String                         lineageQuery = getLineageQuery(guid, 
direction, depth);
+        String                         lineageQuery = getLineageQuery(guid, 
direction, depth, isDataSet);
 
         List edgeMapList = (List) graph.executeGremlinScript(lineageQuery, 
false);
 
@@ -244,38 +262,39 @@ public class EntityLineageService implements 
AtlasLineageService {
         }
     }
 
-    private AtlasLineageInfo getBothLineageInfo(String guid, int depth) throws 
AtlasBaseException {
-        AtlasLineageInfo inputLineage  = getLineageInfo(guid, 
LineageDirection.INPUT, depth);
-        AtlasLineageInfo outputLineage = getLineageInfo(guid, 
LineageDirection.OUTPUT, depth);
+    private AtlasLineageInfo getBothLineageInfo(String guid, int depth, 
boolean isDataSet) throws AtlasBaseException {
+        AtlasLineageInfo inputLineage  = getLineageInfo(guid, INPUT, depth, 
isDataSet);
+        AtlasLineageInfo outputLineage = getLineageInfo(guid, OUTPUT, depth, 
isDataSet);
         AtlasLineageInfo ret           = inputLineage;
 
         ret.getRelations().addAll(outputLineage.getRelations());
         ret.getGuidEntityMap().putAll(outputLineage.getGuidEntityMap());
-        ret.setLineageDirection(LineageDirection.BOTH);
+        ret.setLineageDirection(BOTH);
 
         return ret;
     }
 
-    private String getLineageQuery(String entityGuid, LineageDirection 
direction, int depth) {
-        String lineageQuery = null;
+    private String getLineageQuery(String entityGuid, LineageDirection 
direction, int depth, boolean isDataSet) {
+        String ret = null;
 
-        if (direction.equals(LineageDirection.INPUT)) {
-            lineageQuery = generateLineageQuery(entityGuid, depth, 
PROCESS_OUTPUTS_EDGE, PROCESS_INPUTS_EDGE);
+        if (direction.equals(INPUT)) {
+            ret = generateLineageQuery(entityGuid, depth, isDataSet, 
PROCESS_OUTPUTS_EDGE, PROCESS_INPUTS_EDGE);
 
-        } else if (direction.equals(LineageDirection.OUTPUT)) {
-            lineageQuery = generateLineageQuery(entityGuid, depth, 
PROCESS_INPUTS_EDGE, PROCESS_OUTPUTS_EDGE);
+        } else if (direction.equals(OUTPUT)) {
+            ret = generateLineageQuery(entityGuid, depth, isDataSet, 
PROCESS_INPUTS_EDGE, PROCESS_OUTPUTS_EDGE);
         }
 
-        return lineageQuery;
+        return ret;
     }
 
-    private String generateLineageQuery(String entityGuid, int depth, String 
incomingFrom, String outgoingTo) {
+    private String generateLineageQuery(String entityGuid, int depth, boolean 
isDataSet, String incomingFrom, String outgoingTo) {
         String lineageQuery;
+
         if (depth < 1) {
-            String query = 
gremlinQueryProvider.getQuery(AtlasGremlinQuery.FULL_LINEAGE);
+            String query = isDataSet ? 
gremlinQueryProvider.getQuery(FULL_LINEAGE_DATASET) : 
gremlinQueryProvider.getQuery(FULL_LINEAGE_PROCESS);
             lineageQuery = String.format(query, entityGuid, incomingFrom, 
outgoingTo);
         } else {
-            String query = 
gremlinQueryProvider.getQuery(AtlasGremlinQuery.PARTIAL_LINEAGE);
+            String query = isDataSet ? 
gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_DATASET) : 
gremlinQueryProvider.getQuery(PARTIAL_LINEAGE_PROCESS);
             lineageQuery = String.format(query, entityGuid, incomingFrom, 
outgoingTo, depth);
         }
         return lineageQuery;

http://git-wip-us.apache.org/repos/asf/atlas/blob/8cc12be1/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
 
b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
index 82243ad..4be8ce3 100644
--- 
a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
+++ 
b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
@@ -44,13 +44,13 @@ public class AtlasGremlin2QueryProvider extends 
AtlasGremlinQueryProvider {
                 return 
"g.V().has('__typeName',typeName).filter({it.getProperty(attrName).matches(attrValue)}).has('__guid').__guid.toList()";
             case EXPORT_TYPE_DEFAULT:
                 return "g.V().has('__typeName',typeName).has(attrName, 
attrValue).has('__guid').__guid.toList()";
-            case FULL_LINEAGE:
+            case FULL_LINEAGE_DATASET:
                 return "g.V('__guid', '%s').as('src').in('%s').out('%s')." +
                         "loop('src', {((it.path.contains(it.object)) ? false : 
true)}, " +
                         "{((it.object.'__superTypeNames') ? " +
                         "(it.object.'__superTypeNames'.contains('DataSet')) : 
false)})." +
                         "path().toList()";
-            case PARTIAL_LINEAGE:
+            case PARTIAL_LINEAGE_DATASET:
                 return "g.V('__guid', '%s').as('src').in('%s').out('%s')." +
                         "loop('src', {it.loops <= %s}, 
{((it.object.'__superTypeNames') ? " +
                         "(it.object.'__superTypeNames'.contains('DataSet')) : 
false)})." +

http://git-wip-us.apache.org/repos/asf/atlas/blob/8cc12be1/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
 
b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
index 9585a57..866e5af 100644
--- 
a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
+++ 
b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin3QueryProvider.java
@@ -45,10 +45,14 @@ public class AtlasGremlin3QueryProvider extends 
AtlasGremlin2QueryProvider {
                 return "g.V().has('__guid', 
startGuid).outE().inV().has('__guid').project('__guid', 
'isProcess').by('__guid').by(map 
{it.get().values('__superTypeNames').toSet().contains('Process')}).dedup().toList()";
             case EXPORT_TYPE_ALL_FOR_TYPE:
                 return "g.V().has('__typeName', 
within(typeName)).has('__guid').values('__guid').toList()";
-            case FULL_LINEAGE:
+            case FULL_LINEAGE_DATASET:
                 return "g.V().has('__guid', 
'%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).emit().select('e1',
 'e2').toList()";
-            case PARTIAL_LINEAGE:
+            case PARTIAL_LINEAGE_DATASET:
                 return "g.V().has('__guid', 
'%s').repeat(__.inE('%s').as('e1').outV().outE('%s').as('e2').inV()).times(%s).emit().select('e1',
 'e2').toList()";
+            case FULL_LINEAGE_PROCESS:
+                return "g.V().has('__guid', 
'%s').repeat(__.outE('%s').as('e1').inV().inE('%s').as('e2').outV()).emit().select('e1',
 'e2').toList()";
+            case PARTIAL_LINEAGE_PROCESS:
+                return "g.V().has('__guid', 
'%s').repeat(__.outE('%s').as('e1').inV().inE('%s').as('e2').outV()).times(%s).emit().select('e1',
 'e2').toList()";
             case TO_RANGE_LIST:
                 return ".range(startIdx, endIdx).toList()";
             case RELATIONSHIP_SEARCH:

http://git-wip-us.apache.org/repos/asf/atlas/blob/8cc12be1/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java 
b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
index 8555b4c..d201db3 100644
--- 
a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
+++ 
b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
@@ -53,8 +53,10 @@ public abstract class AtlasGremlinQueryProvider {
         EXPORT_TYPE_DEFAULT,
 
         // Lineage Queries
-        FULL_LINEAGE,
-        PARTIAL_LINEAGE,
+        FULL_LINEAGE_DATASET,
+        FULL_LINEAGE_PROCESS,
+        PARTIAL_LINEAGE_DATASET,
+        PARTIAL_LINEAGE_PROCESS,
 
         // Discovery Queries
         BASIC_SEARCH_TYPE_FILTER,

Reply via email to