This is an automated email from the ASF dual-hosted git repository.

radhikakundam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ba5f191c ATLAS-4606: Improve Atlas Lineage API performance (fetch 
on-demand)
7ba5f191c is described below

commit 7ba5f191c5701f6fa850d18895951df6d20ab812
Author: radhikakundam <radhikakun...@apache.org>
AuthorDate: Mon Jul 25 16:42:38 2022 -0700

    ATLAS-4606: Improve Atlas Lineage API performance (fetch on-demand)
    
    Signed-off-by: radhikakundam <radhikakun...@apache.org>
---
 .../main/java/org/apache/atlas/AtlasClientV2.java  |   5 +
 .../java/org/apache/atlas/AtlasConfiguration.java  |   3 +
 .../main/java/org/apache/atlas/AtlasErrorCode.java |   2 +
 .../atlas/model/lineage/AtlasLineageInfo.java      | 175 +++++++++++-
 .../model/lineage/LineageOnDemandConstraints.java  |  92 ++++++
 .../test/resources/atlas-application.properties    |   3 +
 .../atlas/discovery/AtlasLineageService.java       |  10 +
 .../atlas/discovery/EntityLineageService.java      | 314 +++++++++++++++++++--
 .../apache/atlas/web/resources/AdminResource.java  |   8 +
 .../org/apache/atlas/web/rest/LineageREST.java     |  46 ++-
 .../atlas/web/integration/BaseResourceIT.java      |   9 +-
 .../atlas/web/integration/LineageClientV2IT.java   |  80 ++++++
 .../test/resources/atlas-application.properties    |   3 +
 13 files changed, 716 insertions(+), 34 deletions(-)

diff --git a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java 
b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
index 85ca78bf9..6910b0e42 100644
--- a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
+++ b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
@@ -56,6 +56,7 @@ import 
org.apache.atlas.model.instance.ClassificationAssociateRequest;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.lineage.AtlasLineageInfo;
 import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
+import org.apache.atlas.model.lineage.LineageOnDemandConstraints;
 import org.apache.atlas.model.profile.AtlasUserSavedSearch;
 import org.apache.atlas.model.typedef.AtlasBusinessMetadataDef;
 import org.apache.atlas.model.typedef.AtlasClassificationDef;
@@ -627,6 +628,9 @@ public class AtlasClientV2 extends AtlasBaseClient {
         return callAPI(API_V2.GET_LINEAGE_BY_ATTRIBUTES, 
AtlasLineageInfo.class, queryParams, typeName);
     }
 
+    public AtlasLineageInfo getLineageInfoOnDemand(String guid, Map<String, 
LineageOnDemandConstraints> lineageConstraintsByGuid) throws 
AtlasServiceException {
+        return callAPI(API_V2.LINEAGE_INFO_ON_DEMAND, AtlasLineageInfo.class, 
lineageConstraintsByGuid, guid);
+    }
 
     /* Discovery APIs */
     public AtlasSearchResult dslSearch(String query) throws 
AtlasServiceException {
@@ -1206,6 +1210,7 @@ public class AtlasClientV2 extends AtlasBaseClient {
         // Lineage APIs
         public static final API_V2 LINEAGE_INFO                = new 
API_V2(LINEAGE_URI, HttpMethod.GET, Response.Status.OK);
         public static final API_V2 GET_LINEAGE_BY_ATTRIBUTES   = new 
API_V2(LINEAGE_URI + "uniqueAttribute/type/", HttpMethod.GET, 
Response.Status.OK);
+        public static final API_V2 LINEAGE_INFO_ON_DEMAND      = new 
API_V2(LINEAGE_URI, HttpMethod.POST, Response.Status.OK);
 
         // Discovery APIs
         public static final API_V2 DSL_SEARCH                  = new 
API_V2(DSL_SEARCH_URI, HttpMethod.GET, Response.Status.OK);
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java 
b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 28fb68aef..e8c7a15ea 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -71,6 +71,9 @@ public enum AtlasConfiguration {
     IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", ""),
     MIGRATION_IMPORT_START_POSITION("atlas.migration.import.start.position", 
0),
     LINEAGE_USING_GREMLIN("atlas.lineage.query.use.gremlin", false),
+    LINEAGE_ON_DEMAND_ENABLED("atlas.lineage.on.demand.enabled", false),
+    
LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT("atlas.lineage.on.demand.default.node.count",
 3),
+    LINEAGE_MAX_NODE_COUNT("atlas.lineage.max.node.count", 9000),
 
     HTTP_HEADER_SERVER_VALUE("atlas.http.header.server.value","Apache Atlas"),
     
STORAGE_CONSISTENCY_LOCK_ENABLED("atlas.graph.storage.consistency-lock.enabled",
 true),
diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java 
b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index ab35f8724..608342433 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -175,6 +175,7 @@ public enum AtlasErrorCode {
     ATTRIBUTE_NAME_ALREADY_EXISTS_IN_PARENT_TYPE(400, "ATLAS-400-00-09D", 
"Invalid attribute name: {0}.{1}. Attribute already exists in parent type: 
{2}"),
     ATTRIBUTE_NAME_ALREADY_EXISTS_IN_ANOTHER_PARENT_TYPE(400, 
"ATLAS-400-00-09E", "Invalid attribute name: {0}.{1}. Attribute already exists 
in another parent type: {2}"),
     IMPORT_INVALID_ZIP_ENTRY(400, "ATLAS-400-00-09F", "{0}: invalid zip entry. 
Reason: {1}"),
+    LINEAGE_ON_DEMAND_NOT_ENABLED(400, "ATLAS-400-00-100", "Lineage on demand 
config: {0} is not enabled"),
 
     UNAUTHORIZED_ACCESS(403, "ATLAS-403-00-001", "{0} is not authorized to 
perform {1}"),
 
@@ -200,6 +201,7 @@ public enum AtlasErrorCode {
     NO_PROPAGATED_CLASSIFICATIONS_FOUND_FOR_ENTITY(404, "ATLAS-404-00-013", 
"No propagated classifications associated with entity: {0}"),
     FILE_NAME_NOT_FOUND(404, "ATLAS-404-00-014", "File name should not be 
blank"),
     NO_TYPE_NAME_ON_VERTEX(404, "ATLAS-404-00-015", "No typename found for 
given entity with guid: {0}"),
+    NO_LINEAGE_CONSTRAINTS_FOR_GUID(404, "ATLAS-404-00-016", "No lineage 
constraints found for requested entity with guid : {0}"),
 
     METHOD_NOT_ALLOWED(405, "ATLAS-405-00-001", "Error 405 - The request 
method {0} is inappropriate for the URL: {1}"),
 
diff --git 
a/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java 
b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java
index 27186ca7c..9ae6039f1 100644
--- a/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java
+++ b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java
@@ -20,6 +20,7 @@ package org.apache.atlas.model.lineage;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import org.apache.atlas.model.instance.AtlasEntityHeader;
@@ -37,15 +38,19 @@ import static 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
 
 @JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = 
PUBLIC_ONLY, fieldVisibility = NONE)
 @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonIgnoreProperties(ignoreUnknown = true, value = {"visitedEdges"})
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.PROPERTY)
 public class AtlasLineageInfo implements Serializable {
-    private String                         baseEntityGuid;
-    private LineageDirection               lineageDirection;
-    private int                            lineageDepth;
-    private Map<String, AtlasEntityHeader> guidEntityMap;
-    private Set<LineageRelation>           relations;
+
+    private String                                  baseEntityGuid;
+    private LineageDirection                        lineageDirection;
+    private int                                     lineageDepth;
+    private Map<String, AtlasEntityHeader>          guidEntityMap;
+    private Set<LineageRelation>                    relations;
+    private Set<String>                             visitedEdges;
+    private Map<String, LineageInfoOnDemand>        relationsOnDemand;
+    private Map<String, LineageOnDemandConstraints> lineageOnDemandPayload;
 
     public AtlasLineageInfo() {}
 
@@ -62,11 +67,18 @@ public class AtlasLineageInfo implements Serializable {
      */
     public AtlasLineageInfo(String baseEntityGuid, Map<String, 
AtlasEntityHeader> guidEntityMap,
                             Set<LineageRelation> relations, LineageDirection 
lineageDirection, int lineageDepth) {
-        this.baseEntityGuid   = baseEntityGuid;
-        this.lineageDirection = lineageDirection;
-        this.lineageDepth     = lineageDepth;
-        this.guidEntityMap    = guidEntityMap;
-        this.relations        = relations;
+        this(baseEntityGuid, guidEntityMap, relations, null, null, 
lineageDirection, lineageDepth);
+    }
+
+    public AtlasLineageInfo(String baseEntityGuid, Map<String, 
AtlasEntityHeader> guidEntityMap,
+                            Set<LineageRelation> relations, Set<String> 
visitedEdges, Map<String, LineageInfoOnDemand> relationsOnDemand, 
LineageDirection lineageDirection, int lineageDepth) {
+        this.baseEntityGuid               = baseEntityGuid;
+        this.lineageDirection             = lineageDirection;
+        this.lineageDepth                 = lineageDepth;
+        this.guidEntityMap                = guidEntityMap;
+        this.relations                    = relations;
+        this.visitedEdges                 = visitedEdges;
+        this.relationsOnDemand            = relationsOnDemand;
     }
 
     public String getBaseEntityGuid() {
@@ -93,6 +105,22 @@ public class AtlasLineageInfo implements Serializable {
         this.relations = relations;
     }
 
+    public Set<String> getVisitedEdges() {
+        return visitedEdges;
+    }
+
+    public void setVisitedEdges(Set<String> visitedEdges) {
+        this.visitedEdges = visitedEdges;
+    }
+
+    public Map<String, LineageInfoOnDemand> getRelationsOnDemand() {
+        return relationsOnDemand;
+    }
+
+    public void setRelationsOnDemand(Map<String, LineageInfoOnDemand> 
relationsOnDemand) {
+        this.relationsOnDemand = relationsOnDemand;
+    }
+
     public LineageDirection getLineageDirection() {
         return lineageDirection;
     }
@@ -109,6 +137,14 @@ public class AtlasLineageInfo implements Serializable {
         this.lineageDepth = lineageDepth;
     }
 
+    public Map<String, LineageOnDemandConstraints> getLineageOnDemandPayload() 
{
+        return lineageOnDemandPayload;
+    }
+
+    public void setLineageOnDemandPayload(Map<String, 
LineageOnDemandConstraints> lineageOnDemandPayload) {
+        this.lineageOnDemandPayload = lineageOnDemandPayload;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
@@ -132,6 +168,7 @@ public class AtlasLineageInfo implements Serializable {
                 "baseEntityGuid=" + baseEntityGuid +
                 ", guidEntityMap=" + guidEntityMap +
                 ", relations=" + relations +
+                ", relationsOnDemand=" + relationsOnDemand +
                 ", lineageDirection=" + lineageDirection +
                 ", lineageDepth=" + lineageDepth +
                 '}';
@@ -204,4 +241,120 @@ public class AtlasLineageInfo implements Serializable {
         }
     }
 
+    @JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = 
PUBLIC_ONLY, fieldVisibility = NONE)
+    @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+    @JsonIgnoreProperties(ignoreUnknown = true, value = 
{"inputRelationsReachedLimit", "outputRelationsReachedLimit"})
+    @XmlRootElement
+    @XmlAccessorType(XmlAccessType.PROPERTY)
+    public static class LineageInfoOnDemand {
+        @JsonProperty
+        boolean                    hasMoreInputs;
+        @JsonProperty
+        boolean                    hasMoreOutputs;
+        int                        inputRelationsCount;
+        int                        outputRelationsCount;
+        boolean                    isInputRelationsReachedLimit;
+        boolean                    isOutputRelationsReachedLimit;
+        LineageOnDemandConstraints onDemandConstraints;
+
+        public LineageInfoOnDemand() { }
+
+        public LineageInfoOnDemand(LineageOnDemandConstraints 
onDemandConstraints) {
+            this.onDemandConstraints           = onDemandConstraints;
+            this.hasMoreInputs                 = false;
+            this.hasMoreOutputs                = false;
+            this.inputRelationsCount           = 0;
+            this.outputRelationsCount          = 0;
+            this.isInputRelationsReachedLimit  = false;
+            this.isOutputRelationsReachedLimit = false;
+        }
+
+        public boolean isInputRelationsReachedLimit() {
+            return isInputRelationsReachedLimit;
+        }
+
+        public void setInputRelationsReachedLimit(boolean 
inputRelationsReachedLimit) {
+            isInputRelationsReachedLimit = inputRelationsReachedLimit;
+        }
+
+        public boolean isOutputRelationsReachedLimit() {
+            return isOutputRelationsReachedLimit;
+        }
+
+        public void setOutputRelationsReachedLimit(boolean 
outputRelationsReachedLimit) {
+            isOutputRelationsReachedLimit = outputRelationsReachedLimit;
+        }
+
+        public boolean hasMoreInputs() {
+            return hasMoreInputs;
+        }
+
+        public void setHasMoreInputs(boolean hasMoreInputs) {
+            this.hasMoreInputs = hasMoreInputs;
+        }
+
+        public boolean hasMoreOutputs() {
+            return hasMoreOutputs;
+        }
+
+        public void setHasMoreOutputs(boolean hasMoreOutputs) {
+            this.hasMoreOutputs = hasMoreOutputs;
+        }
+
+        public int getInputRelationsCount() {
+            return inputRelationsCount;
+        }
+
+        public void incrementInputRelationsCount() {
+            if (hasMoreInputs) {
+                return;
+            }
+
+            if (isInputRelationsReachedLimit) {
+                setHasMoreInputs(true);
+                return;
+            }
+
+            this.inputRelationsCount++;
+
+            if (inputRelationsCount == 
onDemandConstraints.getInputRelationsLimit()) {
+                this.setInputRelationsReachedLimit(true);
+                return;
+            }
+        }
+
+        public int getOutputRelationsCount() {
+            return outputRelationsCount;
+        }
+
+        public void incrementOutputRelationsCount() {
+            if (hasMoreOutputs) {
+                return;
+            }
+
+            if (isOutputRelationsReachedLimit) {
+                setHasMoreOutputs(true);
+                return;
+            }
+
+            this.outputRelationsCount++;
+
+            if (outputRelationsCount == 
onDemandConstraints.getOutputRelationsLimit()) {
+                this.setOutputRelationsReachedLimit(true);
+                return;
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "LineageInfoOnDemand{" +
+                    "hasMoreInputs='" + hasMoreInputs + '\'' +
+                    ", hasMoreOutputs='" + hasMoreOutputs + '\'' +
+                    ", inputRelationsCount='" + inputRelationsCount + '\'' +
+                    ", outputRelationsCount='" + outputRelationsCount + '\'' +
+                    '}';
+        }
+
+    }
+
 }
\ No newline at end of file
diff --git 
a/intg/src/main/java/org/apache/atlas/model/lineage/LineageOnDemandConstraints.java
 
b/intg/src/main/java/org/apache/atlas/model/lineage/LineageOnDemandConstraints.java
new file mode 100644
index 000000000..0b3ea2418
--- /dev/null
+++ 
b/intg/src/main/java/org/apache/atlas/model/lineage/LineageOnDemandConstraints.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.lineage;
+
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
+
+import java.io.Serializable;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = 
PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+/**
+ * This is the root class representing the input for lineage search on-demand.
+ */
+public class LineageOnDemandConstraints implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private LineageDirection direction;
+    private int              inputRelationsLimit;
+    private int              outputRelationsLimit;
+    private int              depth;
+
+    private static final int LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT = 
AtlasConfiguration.LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT.getInt();
+    private static final int LINEAGE_ON_DEMAND_DEFAULT_DEPTH      = 3;
+
+    public LineageOnDemandConstraints() {
+        this(LineageDirection.BOTH, LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT, 
LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT, LINEAGE_ON_DEMAND_DEFAULT_DEPTH);
+    }
+
+    public LineageOnDemandConstraints(LineageDirection direction, int 
inputRelationsLimit, int outputRelationsLimit, int depth) {
+        this.direction            = direction;
+        this.inputRelationsLimit  = inputRelationsLimit;
+        this.outputRelationsLimit = outputRelationsLimit;
+        this.depth                = depth;
+    }
+
+    public LineageDirection getDirection() {
+        return direction;
+    }
+
+    public void setDirection(LineageDirection direction) {
+        this.direction = direction;
+    }
+
+    public int getInputRelationsLimit() {
+        return inputRelationsLimit;
+    }
+
+    public void setInputRelationsLimit(int inputRelationsLimit) {
+        this.inputRelationsLimit = inputRelationsLimit;
+    }
+
+    public int getOutputRelationsLimit() {
+        return outputRelationsLimit;
+    }
+
+    public void setOutputRelationsLimit(int outputRelationsLimit) {
+        this.outputRelationsLimit = outputRelationsLimit;
+    }
+
+    public int getDepth() {
+        return depth;
+    }
+
+    public void setDepth(int depth) {
+        this.depth = depth;
+    }
+
+}
diff --git a/intg/src/test/resources/atlas-application.properties 
b/intg/src/test/resources/atlas-application.properties
index 2139cec24..1156abcca 100644
--- a/intg/src/test/resources/atlas-application.properties
+++ b/intg/src/test/resources/atlas-application.properties
@@ -146,3 +146,6 @@ atlas.search.gremlin.enable=true
 
 #########  Configure use of Tasks  #########
 atlas.tasks.enabled=false
+
+#########  Configure on-demand lineage  #########
+atlas.lineage.on.demand.enabled=true
diff --git 
a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java 
b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java
index 8dc6d3a19..0b518b904 100644
--- 
a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java
+++ 
b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java
@@ -22,8 +22,11 @@ package org.apache.atlas.discovery;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.lineage.AtlasLineageInfo;
 import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
+import org.apache.atlas.model.lineage.LineageOnDemandConstraints;
 import org.apache.atlas.v1.model.lineage.SchemaResponse.SchemaDetails;
 
+import java.util.Map;
+
 public interface AtlasLineageService {
     /**
      * @param entityGuid unique ID of the entity
@@ -33,6 +36,13 @@ public interface AtlasLineageService {
      */
     AtlasLineageInfo getAtlasLineageInfo(String entityGuid, LineageDirection 
direction, int depth) throws AtlasBaseException;
 
+    /**
+     * @param entityGuid unique ID of the entity
+     * @param lineageOnDemandConstraintsByGuid map of constraints to fetch 
lineage for each guid
+     * @return AtlasLineageInfo
+     */
+    AtlasLineageInfo getAtlasLineageInfo(String entityGuid, Map<String, 
LineageOnDemandConstraints> lineageOnDemandConstraintsByGuid) throws 
AtlasBaseException;
+
     /**
      * Return the schema for the given datasetName.
      *
diff --git 
a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java 
b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
index d18ace841..aa881f2bd 100644
--- 
a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
+++ 
b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
@@ -31,8 +31,10 @@ import 
org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.lineage.AtlasLineageInfo;
+import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageInfoOnDemand;
 import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
 import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation;
+import org.apache.atlas.model.lineage.LineageOnDemandConstraints;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
@@ -59,6 +61,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -80,10 +83,14 @@ import static 
org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.
 public class EntityLineageService implements AtlasLineageService {
     private static final Logger LOG = 
LoggerFactory.getLogger(EntityLineageService.class);
 
-    private static final String  PROCESS_INPUTS_EDGE   = "__Process.inputs";
-    private static final String  PROCESS_OUTPUTS_EDGE  = "__Process.outputs";
-    private static final String  COLUMNS               = "columns";
-    private static final boolean LINEAGE_USING_GREMLIN = 
AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean();
+    private static final String  PROCESS_INPUTS_EDGE                  = 
"__Process.inputs";
+    private static final String  PROCESS_OUTPUTS_EDGE                 = 
"__Process.outputs";
+    private static final String  COLUMNS                              = 
"columns";
+    private static final boolean LINEAGE_USING_GREMLIN                = 
AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean();
+    private static final Integer DEFAULT_LINEAGE_MAX_NODE_COUNT       = 9000;
+    private static final int     LINEAGE_ON_DEMAND_DEFAULT_DEPTH      = 3;
+    private static final int     LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT = 
AtlasConfiguration.LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT.getInt();
+    private static final String  SEPARATOR                            = "->";
 
     private final AtlasGraph                graph;
     private final AtlasGremlinQueryProvider gremlinQueryProvider;
@@ -103,6 +110,40 @@ public class EntityLineageService implements 
AtlasLineageService {
     public AtlasLineageInfo getAtlasLineageInfo(String guid, LineageDirection 
direction, int depth) throws AtlasBaseException {
         AtlasLineageInfo ret;
 
+        boolean isDataSet = validateEntityTypeAndCheckIfDataSet(guid);
+
+        if (LINEAGE_USING_GREMLIN) {
+            ret = getLineageInfoV1(guid, direction, depth, isDataSet);
+        } else {
+            ret = getLineageInfoV2(guid, direction, depth, isDataSet);
+        }
+
+        return ret;
+    }
+
+    @Override
+    @GraphTransaction
+    public AtlasLineageInfo getAtlasLineageInfo(String guid, Map<String, 
LineageOnDemandConstraints> lineageConstraintsMap) throws AtlasBaseException {
+        AtlasLineageInfo ret;
+
+        if (MapUtils.isEmpty(lineageConstraintsMap)) {
+            lineageConstraintsMap = new HashMap<>();
+            lineageConstraintsMap.put(guid, 
getDefaultLineageConstraints(guid));
+        }
+
+        boolean isDataSet = validateEntityTypeAndCheckIfDataSet(guid);
+
+        ret = getLineageInfoOnDemand(guid, lineageConstraintsMap, isDataSet);
+
+        appendLineageOnDemandPayload(ret, lineageConstraintsMap);
+
+        // filtering out on-demand relations which has input & output nodes 
within the limit
+        cleanupRelationsOnDemand(ret);
+
+        return ret;
+    }
+
+    private boolean validateEntityTypeAndCheckIfDataSet(String guid) throws 
AtlasBaseException {
         AtlasEntityHeader entity = 
entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
 
         AtlasAuthorizationUtils.verifyAccess(new 
AtlasEntityAccessRequest(atlasTypeRegistry, AtlasPrivilege.ENTITY_READ, 
entity), "read entity lineage: guid=", guid);
@@ -123,13 +164,21 @@ public class EntityLineageService implements 
AtlasLineageService {
             }
         }
 
-        if (LINEAGE_USING_GREMLIN) {
-            ret = getLineageInfoV1(guid, direction, depth, isDataSet);
-        } else {
-            ret = getLineageInfoV2(guid, direction, depth, isDataSet);
+        return isDataSet;
+    }
+
+    private void appendLineageOnDemandPayload(AtlasLineageInfo lineageInfo, 
Map<String, LineageOnDemandConstraints> lineageConstraintsMap) {
+        if (lineageInfo == null || MapUtils.isEmpty(lineageConstraintsMap)) {
+            return;
         }
+        lineageInfo.setLineageOnDemandPayload(lineageConstraintsMap);
+    }
 
-        return ret;
+    //Consider only relationsOnDemand which has either more inputs or more 
outputs than given limit
+    private void cleanupRelationsOnDemand(AtlasLineageInfo lineageInfo) {
+        if (lineageInfo != null && 
MapUtils.isNotEmpty(lineageInfo.getRelationsOnDemand())) {
+            lineageInfo.getRelationsOnDemand().entrySet().removeIf(x -> 
!(x.getValue().hasMoreInputs() || x.getValue().hasMoreOutputs()));
+        }
     }
 
     @Override
@@ -300,6 +349,102 @@ public class EntityLineageService implements 
AtlasLineageService {
         return ret;
     }
 
+    private LineageOnDemandConstraints getDefaultLineageConstraints(String 
guid) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("No lineage on-demand constraints provided for guid: {}, 
configuring with default values direction: {}, inputRelationsLimit: {}, 
outputRelationsLimit: {}, depth: {}",
+                    guid, BOTH, LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT, 
LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT, LINEAGE_ON_DEMAND_DEFAULT_DEPTH);
+        }
+
+        return new LineageOnDemandConstraints();
+    }
+
+    private LineageOnDemandConstraints 
getAndValidateLineageConstraintsByGuid(String guid, Map<String, 
LineageOnDemandConstraints> lineageConstraintsMap) {
+
+        if (lineageConstraintsMap == null || 
!lineageConstraintsMap.containsKey(guid)) {
+            return getDefaultLineageConstraints(guid);
+        }
+
+        LineageOnDemandConstraints lineageConstraintsByGuid = 
lineageConstraintsMap.get(guid);;
+        if (lineageConstraintsByGuid == null) {
+            return getDefaultLineageConstraints(guid);
+        }
+
+        if (Objects.isNull(lineageConstraintsByGuid.getDirection())) {
+            LOG.info("No lineage on-demand direction provided for guid: {}, 
configuring with default value {}", guid, LineageDirection.BOTH);
+            lineageConstraintsByGuid.setDirection(BOTH);
+        }
+
+        if (lineageConstraintsByGuid.getInputRelationsLimit() == 0) {
+            LOG.info("No lineage on-demand inputRelationsLimit provided for 
guid: {}, configuring with default value {}", guid, 
LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT);
+            
lineageConstraintsByGuid.setInputRelationsLimit(LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT);
+        }
+
+        if (lineageConstraintsByGuid.getOutputRelationsLimit() == 0) {
+            LOG.info("No lineage on-demand outputRelationsLimit provided for 
guid: {}, configuring with default value {}", guid, 
LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT);
+            
lineageConstraintsByGuid.setOutputRelationsLimit(LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT);
+        }
+
+        if (lineageConstraintsByGuid.getDepth() == 0) {
+            LOG.info("No lineage on-demand depth provided for guid: {}, 
configuring with default value {}", guid, LINEAGE_ON_DEMAND_DEFAULT_DEPTH);
+            lineageConstraintsByGuid.setDepth(LINEAGE_ON_DEMAND_DEFAULT_DEPTH);
+        }
+
+        return lineageConstraintsByGuid;
+
+    }
+
+    private AtlasLineageInfo getLineageInfoOnDemand(String guid, Map<String, 
LineageOnDemandConstraints> lineageConstraintsMap, boolean isDataSet) throws 
AtlasBaseException {
+
+        LineageOnDemandConstraints lineageConstraintsByGuid = 
getAndValidateLineageConstraintsByGuid(guid, lineageConstraintsMap);
+
+        if (lineageConstraintsByGuid == null) {
+            throw new 
AtlasBaseException(AtlasErrorCode.NO_LINEAGE_CONSTRAINTS_FOR_GUID, guid);
+        }
+
+        LineageDirection direction = lineageConstraintsByGuid.getDirection();
+        int              depth     = lineageConstraintsByGuid.getDepth();
+
+        AtlasLineageInfo ret = initializeLineageInfo(guid, direction, depth);
+
+        if (depth == 0) {
+            depth = -1;
+        }
+
+        if (isDataSet) {
+            AtlasVertex datasetVertex = 
AtlasGraphUtilsV2.findByGuid(this.graph, guid);
+
+            if (!ret.getRelationsOnDemand().containsKey(guid)) {
+                ret.getRelationsOnDemand().put(guid, new 
LineageInfoOnDemand(lineageConstraintsByGuid));
+            }
+
+            if (direction == INPUT || direction == BOTH) {
+                traverseEdgesOnDemand(datasetVertex, true, depth, new 
HashSet<>(), lineageConstraintsMap, ret);
+            }
+
+            if (direction == OUTPUT || direction == BOTH) {
+                traverseEdgesOnDemand(datasetVertex, false, depth, new 
HashSet<>(), lineageConstraintsMap, ret);
+            }
+        } else  {
+            AtlasVertex processVertex = 
AtlasGraphUtilsV2.findByGuid(this.graph, guid);
+
+            // make one hop to the next dataset vertices from process vertex 
and traverse with 'depth = depth - 1'
+            if (direction == INPUT || direction == BOTH) {
+                Iterable<AtlasEdge> processEdges = 
processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE);
+
+                traverseEdgesOnDemand(processEdges, true, depth, 
lineageConstraintsMap, ret);
+            }
+
+            if (direction == OUTPUT || direction == BOTH) {
+                Iterable<AtlasEdge> processEdges = 
processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE);
+
+                traverseEdgesOnDemand(processEdges, false, depth, 
lineageConstraintsMap, ret);
+            }
+
+        }
+
+        return ret;
+    }
+
     private void traverseEdges(AtlasVertex datasetVertex, boolean isInput, int 
depth, AtlasLineageInfo ret) throws AtlasBaseException {
         traverseEdges(datasetVertex, isInput, depth, new HashSet<>(), ret);
     }
@@ -331,24 +476,139 @@ public class EntityLineageService implements 
AtlasLineageService {
         }
     }
 
+    private void traverseEdgesOnDemand(Iterable<AtlasEdge> processEdges, 
boolean isInput, int depth, Map<String, LineageOnDemandConstraints> 
lineageConstraintsMap, AtlasLineageInfo ret) throws AtlasBaseException {
+        for (AtlasEdge processEdge : processEdges) {
+            boolean isInputEdge  = 
processEdge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE);
+
+            if (incrementAndCheckIfRelationsLimitReached(processEdge, 
isInputEdge, lineageConstraintsMap, ret)) {
+                break;
+            } else {
+                addEdgeToResult(processEdge, ret);
+            }
+
+            AtlasVertex datasetVertex = processEdge.getInVertex();
+
+            String inGuid = AtlasGraphUtilsV2.getIdFromVertex(datasetVertex);
+            LineageOnDemandConstraints inGuidLineageConstrains = 
getAndValidateLineageConstraintsByGuid(inGuid, lineageConstraintsMap);
+
+            if (!ret.getRelationsOnDemand().containsKey(inGuid)) {
+                ret.getRelationsOnDemand().put(inGuid, new 
LineageInfoOnDemand(inGuidLineageConstrains));
+            }
+
+            traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, new 
HashSet<>(), lineageConstraintsMap, ret);
+        }
+    }
+
+    private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean 
isInput, int depth, Set<String> visitedVertices, Map<String, 
LineageOnDemandConstraints> lineageConstraintsMap, AtlasLineageInfo ret) throws 
AtlasBaseException {
+        if (depth != 0) {
+            // keep track of visited vertices to avoid circular loop
+            visitedVertices.add(getId(datasetVertex));
+
+            Iterable<AtlasEdge> incomingEdges = datasetVertex.getEdges(IN, 
isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE);
+
+            for (AtlasEdge incomingEdge : incomingEdges) {
+
+                if (incrementAndCheckIfRelationsLimitReached(incomingEdge, 
!isInput, lineageConstraintsMap, ret)) {
+                    break;
+                } else {
+                    addEdgeToResult(incomingEdge, ret);
+                }
+
+                AtlasVertex         processVertex = 
incomingEdge.getOutVertex();
+                Iterable<AtlasEdge> outgoingEdges = 
processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : 
PROCESS_OUTPUTS_EDGE);
+
+                for (AtlasEdge outgoingEdge : outgoingEdges) {
+
+                    if (incrementAndCheckIfRelationsLimitReached(outgoingEdge, 
isInput, lineageConstraintsMap, ret)) {
+                        break;
+                    } else {
+                        addEdgeToResult(outgoingEdge, ret);
+                    }
+
+                    AtlasVertex entityVertex = outgoingEdge.getInVertex();
+
+                    if (entityVertex != null && 
!visitedVertices.contains(getId(entityVertex))) {
+                        traverseEdgesOnDemand(entityVertex, isInput, depth - 
1, visitedVertices, lineageConstraintsMap, ret);
+                    }
+                }
+            }
+        }
+    }
+
+    private boolean incrementAndCheckIfRelationsLimitReached(AtlasEdge 
atlasEdge, boolean isInput, Map<String, LineageOnDemandConstraints> 
lineageConstraintsMap, AtlasLineageInfo ret) {
+
+        if (lineageContainsEdge(ret, atlasEdge)) {
+            return false;
+        }
+
+        boolean hasRelationsLimitReached = false;
+
+        AtlasVertex                inVertex                 = isInput ? 
atlasEdge.getOutVertex() : atlasEdge.getInVertex();
+        String                     inGuid                   = 
AtlasGraphUtilsV2.getIdFromVertex(inVertex);
+        LineageOnDemandConstraints inGuidLineageConstraints = 
getAndValidateLineageConstraintsByGuid(inGuid, lineageConstraintsMap);
+
+        AtlasVertex                outVertex                 = isInput ? 
atlasEdge.getInVertex() : atlasEdge.getOutVertex();
+        String                     outGuid                   = 
AtlasGraphUtilsV2.getIdFromVertex(outVertex);
+        LineageOnDemandConstraints outGuidLineageConstraints = 
getAndValidateLineageConstraintsByGuid(outGuid, lineageConstraintsMap);
+
+        LineageInfoOnDemand inLineageInfo = 
ret.getRelationsOnDemand().containsKey(inGuid) ? 
ret.getRelationsOnDemand().get(inGuid) : new 
LineageInfoOnDemand(inGuidLineageConstraints);
+        LineageInfoOnDemand outLineageInfo = 
ret.getRelationsOnDemand().containsKey(outGuid) ? 
ret.getRelationsOnDemand().get(outGuid) : new 
LineageInfoOnDemand(outGuidLineageConstraints);
+
+        if (inLineageInfo.isInputRelationsReachedLimit()) {
+            inLineageInfo.setHasMoreInputs(true);
+            hasRelationsLimitReached = true;
+        }else {
+            inLineageInfo.incrementInputRelationsCount();
+        }
+
+        if (outLineageInfo.isOutputRelationsReachedLimit()) {
+            outLineageInfo.setHasMoreOutputs(true);
+            hasRelationsLimitReached = true;
+        } else {
+            outLineageInfo.incrementOutputRelationsCount();
+        }
+
+        if (!hasRelationsLimitReached) {
+            ret.getRelationsOnDemand().put(inGuid, inLineageInfo);
+            ret.getRelationsOnDemand().put(outGuid, outLineageInfo);
+        }
+
+        return hasRelationsLimitReached;
+    }
+
     private void addEdgeToResult(AtlasEdge edge, AtlasLineageInfo lineageInfo) 
throws AtlasBaseException {
-        if (!lineageContainsEdge(lineageInfo, edge)) {
+        if (!lineageContainsEdge(lineageInfo, edge) && 
!lineageMaxNodeCountReached(lineageInfo.getRelations())) {
             processEdge(edge, lineageInfo);
         }
     }
 
+    private int getLineageMaxNodeAllowedCount() {
+        return Math.min(DEFAULT_LINEAGE_MAX_NODE_COUNT, 
AtlasConfiguration.LINEAGE_MAX_NODE_COUNT.getInt());
+    }
+
+    private boolean lineageMaxNodeCountReached(Set<LineageRelation> relations) 
{
+        return CollectionUtils.isNotEmpty(relations) && relations.size() > 
getLineageMaxNodeAllowedCount();
+    }
+
+    private String getVisitedEdgeLabel(String inGuid, String outGuid, String 
relationGuid) {
+        if (isLineageOnDemandEnabled()) {
+            return inGuid + SEPARATOR + outGuid;
+        }
+        return relationGuid;
+    }
+
     private boolean lineageContainsEdge(AtlasLineageInfo lineageInfo, 
AtlasEdge edge) {
         boolean ret = false;
 
-        if (lineageInfo != null && 
CollectionUtils.isNotEmpty(lineageInfo.getRelations()) && edge != null) {
-            String               relationGuid = 
AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, 
String.class);
-            Set<LineageRelation> relations    = lineageInfo.getRelations();
+        if (edge != null && lineageInfo != null && 
CollectionUtils.isNotEmpty(lineageInfo.getVisitedEdges())) {
+            String  inGuid           = 
AtlasGraphUtilsV2.getIdFromVertex(edge.getInVertex());
+            String  outGuid          = 
AtlasGraphUtilsV2.getIdFromVertex(edge.getOutVertex());
+            String  relationGuid     = 
AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, 
String.class);
+            boolean isInputEdge      = 
edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE);
+            String  visitedEdgeLabel = isInputEdge ? 
getVisitedEdgeLabel(inGuid, outGuid, relationGuid) : 
getVisitedEdgeLabel(outGuid, inGuid, relationGuid);
 
-            for (LineageRelation relation : relations) {
-                if (relation.getRelationshipId().equals(relationGuid)) {
-                    ret = true;
-                    break;
-                }
+            if (lineageInfo.getVisitedEdges().contains(visitedEdgeLabel)) {
+                ret = true;
             }
         }
 
@@ -356,11 +616,11 @@ public class EntityLineageService implements 
AtlasLineageService {
     }
 
     private void processEdge(final AtlasEdge edge, final AtlasLineageInfo 
lineageInfo) throws AtlasBaseException {
-        processEdge(edge, lineageInfo.getGuidEntityMap(), 
lineageInfo.getRelations());
+        processEdge(edge, lineageInfo.getGuidEntityMap(), 
lineageInfo.getRelations(), lineageInfo.getVisitedEdges());
     }
 
     private AtlasLineageInfo initializeLineageInfo(String guid, 
LineageDirection direction, int depth) {
-        return new AtlasLineageInfo(guid, new HashMap<>(), new HashSet<>(), 
direction, depth);
+        return new AtlasLineageInfo(guid, new HashMap<>(), new HashSet<>(), 
new HashSet<>(), new HashMap<>(), direction, depth);
     }
 
     private static String getId(AtlasVertex vertex) {
@@ -383,6 +643,10 @@ public class EntityLineageService implements 
AtlasLineageService {
     }
 
     private void processEdge(final AtlasEdge edge, final Map<String, 
AtlasEntityHeader> entities, final Set<LineageRelation> relations) throws 
AtlasBaseException {
+        processEdge(edge, entities, relations, null);
+    }
+
+    private void processEdge(final AtlasEdge edge, final Map<String, 
AtlasEntityHeader> entities, final Set<LineageRelation> relations, final 
Set<String> visitedEdges) throws AtlasBaseException {
         AtlasVertex inVertex     = edge.getInVertex();
         AtlasVertex outVertex    = edge.getOutVertex();
         String      inGuid       = AtlasGraphUtilsV2.getIdFromVertex(inVertex);
@@ -390,6 +654,7 @@ public class EntityLineageService implements 
AtlasLineageService {
         String      relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, 
RELATIONSHIP_GUID_PROPERTY_KEY, String.class);
         boolean     isInputEdge  = 
edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE);
 
+
         if (!entities.containsKey(inGuid)) {
             AtlasEntityHeader entityHeader = 
entityRetriever.toAtlasEntityHeader(inVertex);
             entities.put(inGuid, entityHeader);
@@ -405,6 +670,11 @@ public class EntityLineageService implements 
AtlasLineageService {
         } else {
             relations.add(new LineageRelation(outGuid, inGuid, relationGuid));
         }
+
+        if (visitedEdges != null) {
+            String visitedEdgeLabel = isInputEdge ? 
getVisitedEdgeLabel(inGuid, outGuid, relationGuid) : 
getVisitedEdgeLabel(outGuid, inGuid, relationGuid);
+            visitedEdges.add(visitedEdgeLabel);
+        }
     }
 
     private AtlasLineageInfo getBothLineageInfoV1(String guid, int depth, 
boolean isDataSet) throws AtlasBaseException {
@@ -448,4 +718,8 @@ public class EntityLineageService implements 
AtlasLineageService {
 
         return ret;
     }
+
+    public boolean isLineageOnDemandEnabled() {
+        return AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getBoolean();
+    }
 }
\ No newline at end of file
diff --git 
a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java 
b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index d55ada77e..b19095b48 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -184,6 +184,8 @@ public class AdminResource {
     private final  AtlasDebugMetricsSink    debugMetricsRESTSink;
     private final  boolean                  isDebugMetricsEnabled;
     private final  boolean                  isTasksEnabled;
+    private final  boolean                  isOnDemandLineageEnabled;
+    private final  int                      defaultLineageNodeCount;
 
     static {
         try {
@@ -224,12 +226,16 @@ public class AdminResource {
             this.uiDateFormat = atlasProperties.getString(UI_DATE_FORMAT, 
UI_DATE_DEFAULT_FORMAT);
             this.isDebugMetricsEnabled = 
AtlasConfiguration.DEBUG_METRICS_ENABLED.getBoolean();
             this.isTasksEnabled = 
AtlasConfiguration.TASKS_USE_ENABLED.getBoolean();
+            this.isOnDemandLineageEnabled = 
AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getBoolean();
+            this.defaultLineageNodeCount = 
AtlasConfiguration.LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT.getInt();
         } else {
             this.defaultUIVersion = UI_VERSION_V2;
             this.isTimezoneFormatEnabled = true;
             this.uiDateFormat = UI_DATE_DEFAULT_FORMAT;
             this.isDebugMetricsEnabled = false;
             this.isTasksEnabled = false;
+            this.isOnDemandLineageEnabled = false;
+            this.defaultLineageNodeCount = 3;
         }
     }
 
@@ -379,6 +385,8 @@ public class AdminResource {
         responseData.put(UI_DATE_FORMAT, uiDateFormat);
         
responseData.put(AtlasConfiguration.DEBUG_METRICS_ENABLED.getPropertyName(), 
isDebugMetricsEnabled);
         
responseData.put(AtlasConfiguration.TASKS_USE_ENABLED.getPropertyName(), 
isTasksEnabled);
+        
responseData.put(AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getPropertyName(),
 isOnDemandLineageEnabled);
+        
responseData.put(AtlasConfiguration.LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT.getPropertyName(),
 defaultLineageNodeCount);
 
         if (AtlasConfiguration.SESSION_TIMEOUT_SECS.getInt() != -1) {
             
responseData.put(AtlasConfiguration.SESSION_TIMEOUT_SECS.getPropertyName(), 
AtlasConfiguration.SESSION_TIMEOUT_SECS.getInt());
diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java 
b/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java
index 30b562607..c66200c9b 100644
--- a/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java
@@ -19,6 +19,7 @@
 package org.apache.atlas.web.rest;
 
 
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.annotation.Timed;
 import org.apache.atlas.discovery.AtlasLineageService;
@@ -26,6 +27,7 @@ import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.lineage.AtlasLineageInfo;
 import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
+import org.apache.atlas.model.lineage.LineageOnDemandConstraints;
 import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
@@ -33,6 +35,7 @@ import org.apache.atlas.utils.AtlasPerfTracer;
 import org.apache.atlas.web.util.Servlets;
 import org.apache.commons.collections.MapUtils;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
 import javax.inject.Inject;
@@ -43,6 +46,7 @@ import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.POST;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
@@ -59,13 +63,15 @@ import java.util.Map;
 @Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
 @Produces({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
 public class LineageREST {
+    private static final Logger LOG      = 
LoggerFactory.getLogger(LineageREST.class);
     private static final Logger PERF_LOG = 
AtlasPerfTracer.getPerfLogger("rest.LineageREST");
     private static final String PREFIX_ATTR = "attr:";
 
     private final AtlasTypeRegistry typeRegistry;
     private final AtlasLineageService atlasLineageService;
-    private static final String DEFAULT_DIRECTION = "BOTH";
-    private static final String DEFAULT_DEPTH     = "3";
+
+    private static final String  DEFAULT_DIRECTION         = "BOTH";
+    private static final String  DEFAULT_DEPTH             = "3";
 
     @Context
     private HttpServletRequest httpServletRequest;
@@ -109,6 +115,42 @@ public class LineageREST {
         }
     }
 
+    /**
+     * Returns lineage info about entity.
+     * @return AtlasLineageInfo
+     * @throws AtlasBaseException
+     * @HTTP 200 If Lineage exists for the given entity
+     * @HTTP 400 Bad query parameters
+     * @HTTP 404 If no lineage is found for the given entity
+     */
+    @POST
+    @Path("/{guid}")
+    @Consumes(Servlets.JSON_MEDIA_TYPE)
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    @Timed
+    public AtlasLineageInfo getLineageGraph(@PathParam("guid") String guid,
+                                            Map<String, 
LineageOnDemandConstraints> lineageConstraintsMapByGuid) throws 
AtlasBaseException {
+        if (!AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getBoolean()) {
+            LOG.warn("LineageREST: "+ 
AtlasErrorCode.LINEAGE_ON_DEMAND_NOT_ENABLED.getFormattedErrorMessage(AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getPropertyName()));
+
+            throw new 
AtlasBaseException(AtlasErrorCode.LINEAGE_ON_DEMAND_NOT_ENABLED, 
AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getPropertyName());
+        }
+
+        Servlets.validateQueryParamLength("guid", guid);
+
+        AtlasPerfTracer  perf = null;
+
+        try {
+            if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, 
"LineageREST.getLineageGraph(" + guid + "," + lineageConstraintsMapByGuid + 
")");
+            }
+
+            return atlasLineageService.getAtlasLineageInfo(guid, 
lineageConstraintsMapByGuid);
+        } finally {
+            AtlasPerfTracer.log(perf);
+        }
+    }
+
     /**
      * Returns lineage info about entity.
      *
diff --git 
a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java 
b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
index 9d5a9e5c3..c2ccec795 100755
--- a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
@@ -104,6 +104,7 @@ public abstract class BaseResourceIT {
     public static final String SEC_TAG            = "sec_Tag";
     public static final String FINANCE_TAG        = "finance_Tag";
     public static final String CLASSIFICATION     = "classification";
+    public static final String ATLAS_LINEAGE_ON_DEMAND_ENABLED = 
"atlas.lineage.on.demand.enabled";
 
     protected static final int MAX_WAIT_TIME = 60000;
 
@@ -111,6 +112,7 @@ public abstract class BaseResourceIT {
     protected AtlasClient   atlasClientV1;
     protected AtlasClientV2 atlasClientV2;
     protected String[]      atlasUrls;
+    protected boolean       isLineageOnDemandEnabled;
 
 
     protected NotificationInterface notificationInterface = null;
@@ -123,11 +125,16 @@ public abstract class BaseResourceIT {
         
ApplicationProperties.get().setProperty("atlas.client.readTimeoutMSecs", 
"100000000");
         
ApplicationProperties.get().setProperty("atlas.client.connectTimeoutMSecs", 
"100000000");
 
-
         Configuration configuration = ApplicationProperties.get();
 
+        isLineageOnDemandEnabled = 
configuration.getBoolean(ATLAS_LINEAGE_ON_DEMAND_ENABLED);
+        if (!isLineageOnDemandEnabled) {
+            
ApplicationProperties.get().setProperty(ATLAS_LINEAGE_ON_DEMAND_ENABLED, true);
+        }
+
         atlasUrls = configuration.getStringArray(ATLAS_REST_ADDRESS);
 
+
         if (atlasUrls == null || atlasUrls.length == 0) {
             atlasUrls = new String[] { "http://localhost:21000/"; };
         }
diff --git 
a/webapp/src/test/java/org/apache/atlas/web/integration/LineageClientV2IT.java 
b/webapp/src/test/java/org/apache/atlas/web/integration/LineageClientV2IT.java
index e7b67f21e..e2c86bcc4 100644
--- 
a/webapp/src/test/java/org/apache/atlas/web/integration/LineageClientV2IT.java
+++ 
b/webapp/src/test/java/org/apache/atlas/web/integration/LineageClientV2IT.java
@@ -18,8 +18,12 @@
 
 package org.apache.atlas.web.integration;
 
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.lineage.AtlasLineageInfo;
+import org.apache.atlas.model.lineage.LineageOnDemandConstraints;
 import org.apache.atlas.v1.model.instance.Id;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.slf4j.Logger;
@@ -44,6 +48,7 @@ public class LineageClientV2IT extends 
DataSetLineageJerseyResourceIT {
     private static final Logger LOG = 
LoggerFactory.getLogger(LineageClientV2IT.class);
     private String salesFactTable;
     private String salesMonthlyTable;
+    private String salesMonthlyTableOnDemand;
     private String salesDBName;
 
     @BeforeClass
@@ -74,6 +79,67 @@ public class LineageClientV2IT extends 
DataSetLineageJerseyResourceIT {
         Assert.assertEquals(inputLineageInfo.getBaseEntityGuid(), tableId);
     }
 
+    @Test
+    public void testGetLineageInfoOnDemand() throws Exception {
+        String tableId = atlasClientV1.getEntity(HIVE_TABLE_TYPE,
+                REFERENCEABLE_ATTRIBUTE_NAME, 
salesMonthlyTableOnDemand).getId()._getId();
+
+        //Get entire Lineage Info
+        AtlasLineageInfo inputLineageInfo = 
atlasClientV2.getLineageInfo(tableId, AtlasLineageInfo.LineageDirection.INPUT, 
5);
+        Assert.assertNotNull(inputLineageInfo);
+        Map<String, AtlasEntityHeader> entities = 
inputLineageInfo.getGuidEntityMap();
+        Assert.assertNotNull(entities);
+
+        Set<AtlasLineageInfo.LineageRelation> relations = 
inputLineageInfo.getRelations();
+        Assert.assertNotNull(relations);
+
+        Map<String, AtlasLineageInfo.LineageInfoOnDemand> relationsOnDemand = 
inputLineageInfo.getRelationsOnDemand();
+        Assert.assertTrue(relationsOnDemand == null || 
relationsOnDemand.size() == 0);
+
+        Assert.assertEquals(entities.size(), 21);
+        Assert.assertEquals(relations.size(), 20);
+        Assert.assertEquals(inputLineageInfo.getLineageDirection(), 
AtlasLineageInfo.LineageDirection.INPUT);
+        Assert.assertEquals(inputLineageInfo.getLineageDepth(), 5);
+        Assert.assertEquals(inputLineageInfo.getBaseEntityGuid(), tableId);
+
+        //Get lineage info on-demand with input and output limit as 3
+        
ApplicationProperties.get().setProperty(AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getPropertyName(),
 true);
+
+        LineageOnDemandConstraints lineageConstraints = new 
LineageOnDemandConstraints(AtlasLineageInfo.LineageDirection.INPUT, 3, 3, 5);
+        Map<String, LineageOnDemandConstraints> lineageConstraintsByGuid = new 
HashMap<>();
+        lineageConstraintsByGuid.put(tableId, lineageConstraints);
+
+        if (!isLineageOnDemandEnabled) {
+            
Assert.fail(AtlasErrorCode.LINEAGE_ON_DEMAND_NOT_ENABLED.getFormattedErrorMessage(ATLAS_LINEAGE_ON_DEMAND_ENABLED));
+        }
+
+        AtlasLineageInfo inputLineageInfoOnDemand = 
atlasClientV2.getLineageInfoOnDemand(tableId, lineageConstraintsByGuid);
+        Assert.assertNotNull(inputLineageInfoOnDemand);
+        entities = inputLineageInfoOnDemand.getGuidEntityMap();
+        Assert.assertNotNull(entities);
+
+        relations = inputLineageInfoOnDemand.getRelations();
+        Assert.assertNotNull(relations);
+
+        relationsOnDemand = inputLineageInfoOnDemand.getRelationsOnDemand();
+        Assert.assertNotNull(relationsOnDemand);
+
+        Assert.assertEquals(entities.size(), 7);
+        Assert.assertEquals(relations.size(), 6);
+        Assert.assertEquals(relationsOnDemand.size(), 1);
+
+        Assert.assertEquals(inputLineageInfoOnDemand.getLineageDirection(), 
AtlasLineageInfo.LineageDirection.INPUT);
+        Assert.assertEquals(inputLineageInfoOnDemand.getLineageDepth(), 5);
+        Assert.assertEquals(inputLineageInfoOnDemand.getBaseEntityGuid(), 
tableId);
+
+        AtlasLineageInfo.LineageInfoOnDemand relationsOnDemandByTableId = 
relationsOnDemand.get(tableId);
+        Assert.assertNotNull(relationsOnDemandByTableId);
+
+        boolean hasMoreInputs = relationsOnDemandByTableId.hasMoreInputs();
+        Assert.assertTrue(hasMoreInputs);
+
+    }
+
     @Test
     public void testGetLineageInfoByAttribute() throws Exception {
         Map<String, String> attributeMap = new HashMap<>();
@@ -129,5 +195,19 @@ public class LineageClientV2IT extends 
DataSetLineageJerseyResourceIT {
 
         loadProcess("loadSalesMonthly" + randomString(), "John ETL", 
Collections.singletonList(salesFactDaily),
                 Collections.singletonList(salesFactMonthly), "create table as 
select ", "plan", "id", "graph");
+
+        salesMonthlyTableOnDemand = "sales_fact_monthly_mv_on_demand_" + 
randomString();
+        Id salesFactMonthlyOnDemand =
+                table(salesMonthlyTableOnDemand, "sales fact monthly 
materialized view", salesDB, "Jane BI",
+                        "MANAGED", salesFactColumns);
+
+        for (int i = 1; i <= 10; i++) {
+            Id salesFactDailyOnDemand =
+                    table("sales_fact_daily_mv_on_demand_" + randomString() + 
"_" + i, "sales fact daily materialized view -"+i, salesDB,
+                            "Joe BI", "MANAGED", salesFactColumns);
+
+            loadProcess("loadSalesMonthly" + randomString() + "_" + i, "John 
ETL", Collections.singletonList(salesFactDailyOnDemand),
+                    Collections.singletonList(salesFactMonthlyOnDemand), 
"create table as select ", "plan", "id", "graph");
+        }
     }
 }
diff --git a/webapp/src/test/resources/atlas-application.properties 
b/webapp/src/test/resources/atlas-application.properties
index 2f433295d..b863ed84b 100644
--- a/webapp/src/test/resources/atlas-application.properties
+++ b/webapp/src/test/resources/atlas-application.properties
@@ -131,3 +131,6 @@ atlas.search.gremlin.enable=true
 #########  Configure use of Tasks  #########
 atlas.tasks.enabled=false
 atlas.debug.metrics.enabled=true
+
+#########  Configure on-demand lineage  #########
+atlas.lineage.on.demand.enabled=true

Reply via email to