This is an automated email from the ASF dual-hosted git repository.
radhikakundam pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 33d16202e ATLAS-4606: Improve Atlas Lineage API performance (fetch
on-demand)
33d16202e is described below
commit 33d16202e5d67976eb71524c296a83ea10ae1808
Author: radhikakundam <[email protected]>
AuthorDate: Mon Jul 25 16:42:38 2022 -0700
ATLAS-4606: Improve Atlas Lineage API performance (fetch on-demand)
Signed-off-by: radhikakundam <[email protected]>
(cherry picked from commit 7ba5f191c5701f6fa850d18895951df6d20ab812)
---
.../main/java/org/apache/atlas/AtlasClientV2.java | 5 +
.../java/org/apache/atlas/AtlasConfiguration.java | 3 +
.../main/java/org/apache/atlas/AtlasErrorCode.java | 2 +
.../atlas/model/lineage/AtlasLineageInfo.java | 175 +++++++++++-
.../model/lineage/LineageOnDemandConstraints.java | 92 ++++++
.../test/resources/atlas-application.properties | 3 +
.../atlas/discovery/AtlasLineageService.java | 10 +
.../atlas/discovery/EntityLineageService.java | 314 +++++++++++++++++++--
.../apache/atlas/web/resources/AdminResource.java | 8 +
.../org/apache/atlas/web/rest/LineageREST.java | 46 ++-
.../atlas/web/integration/BaseResourceIT.java | 9 +-
.../atlas/web/integration/LineageClientV2IT.java | 80 ++++++
.../test/resources/atlas-application.properties | 3 +
13 files changed, 716 insertions(+), 34 deletions(-)
diff --git a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
index 85ca78bf9..6910b0e42 100644
--- a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
+++ b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
@@ -56,6 +56,7 @@ import
org.apache.atlas.model.instance.ClassificationAssociateRequest;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
+import org.apache.atlas.model.lineage.LineageOnDemandConstraints;
import org.apache.atlas.model.profile.AtlasUserSavedSearch;
import org.apache.atlas.model.typedef.AtlasBusinessMetadataDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
@@ -627,6 +628,9 @@ public class AtlasClientV2 extends AtlasBaseClient {
return callAPI(API_V2.GET_LINEAGE_BY_ATTRIBUTES,
AtlasLineageInfo.class, queryParams, typeName);
}
+ public AtlasLineageInfo getLineageInfoOnDemand(String guid, Map<String,
LineageOnDemandConstraints> lineageConstraintsByGuid) throws
AtlasServiceException {
+ return callAPI(API_V2.LINEAGE_INFO_ON_DEMAND, AtlasLineageInfo.class,
lineageConstraintsByGuid, guid);
+ }
/* Discovery APIs */
public AtlasSearchResult dslSearch(String query) throws
AtlasServiceException {
@@ -1206,6 +1210,7 @@ public class AtlasClientV2 extends AtlasBaseClient {
// Lineage APIs
public static final API_V2 LINEAGE_INFO = new
API_V2(LINEAGE_URI, HttpMethod.GET, Response.Status.OK);
public static final API_V2 GET_LINEAGE_BY_ATTRIBUTES = new
API_V2(LINEAGE_URI + "uniqueAttribute/type/", HttpMethod.GET,
Response.Status.OK);
+ public static final API_V2 LINEAGE_INFO_ON_DEMAND = new
API_V2(LINEAGE_URI, HttpMethod.POST, Response.Status.OK);
// Discovery APIs
public static final API_V2 DSL_SEARCH = new
API_V2(DSL_SEARCH_URI, HttpMethod.GET, Response.Status.OK);
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 28fb68aef..e8c7a15ea 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -71,6 +71,9 @@ public enum AtlasConfiguration {
IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", ""),
MIGRATION_IMPORT_START_POSITION("atlas.migration.import.start.position",
0),
LINEAGE_USING_GREMLIN("atlas.lineage.query.use.gremlin", false),
+ LINEAGE_ON_DEMAND_ENABLED("atlas.lineage.on.demand.enabled", false),
+
LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT("atlas.lineage.on.demand.default.node.count",
3),
+ LINEAGE_MAX_NODE_COUNT("atlas.lineage.max.node.count", 9000),
HTTP_HEADER_SERVER_VALUE("atlas.http.header.server.value","Apache Atlas"),
STORAGE_CONSISTENCY_LOCK_ENABLED("atlas.graph.storage.consistency-lock.enabled",
true),
diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index ab35f8724..608342433 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -175,6 +175,7 @@ public enum AtlasErrorCode {
ATTRIBUTE_NAME_ALREADY_EXISTS_IN_PARENT_TYPE(400, "ATLAS-400-00-09D",
"Invalid attribute name: {0}.{1}. Attribute already exists in parent type:
{2}"),
ATTRIBUTE_NAME_ALREADY_EXISTS_IN_ANOTHER_PARENT_TYPE(400,
"ATLAS-400-00-09E", "Invalid attribute name: {0}.{1}. Attribute already exists
in another parent type: {2}"),
IMPORT_INVALID_ZIP_ENTRY(400, "ATLAS-400-00-09F", "{0}: invalid zip entry.
Reason: {1}"),
+ LINEAGE_ON_DEMAND_NOT_ENABLED(400, "ATLAS-400-00-100", "Lineage on demand
config: {0} is not enabled"),
UNAUTHORIZED_ACCESS(403, "ATLAS-403-00-001", "{0} is not authorized to
perform {1}"),
@@ -200,6 +201,7 @@ public enum AtlasErrorCode {
NO_PROPAGATED_CLASSIFICATIONS_FOUND_FOR_ENTITY(404, "ATLAS-404-00-013",
"No propagated classifications associated with entity: {0}"),
FILE_NAME_NOT_FOUND(404, "ATLAS-404-00-014", "File name should not be
blank"),
NO_TYPE_NAME_ON_VERTEX(404, "ATLAS-404-00-015", "No typename found for
given entity with guid: {0}"),
+ NO_LINEAGE_CONSTRAINTS_FOR_GUID(404, "ATLAS-404-00-016", "No lineage
constraints found for requested entity with guid : {0}"),
METHOD_NOT_ALLOWED(405, "ATLAS-405-00-001", "Error 405 - The request
method {0} is inappropriate for the URL: {1}"),
diff --git
a/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java
b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java
index 27186ca7c..9ae6039f1 100644
--- a/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java
+++ b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java
@@ -20,6 +20,7 @@ package org.apache.atlas.model.lineage;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.atlas.model.instance.AtlasEntityHeader;
@@ -37,15 +38,19 @@ import static
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility =
PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonIgnoreProperties(ignoreUnknown = true, value = {"visitedEdges"})
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public class AtlasLineageInfo implements Serializable {
- private String baseEntityGuid;
- private LineageDirection lineageDirection;
- private int lineageDepth;
- private Map<String, AtlasEntityHeader> guidEntityMap;
- private Set<LineageRelation> relations;
+
+ private String baseEntityGuid;
+ private LineageDirection lineageDirection;
+ private int lineageDepth;
+ private Map<String, AtlasEntityHeader> guidEntityMap;
+ private Set<LineageRelation> relations;
+ private Set<String> visitedEdges;
+ private Map<String, LineageInfoOnDemand> relationsOnDemand;
+ private Map<String, LineageOnDemandConstraints> lineageOnDemandPayload;
public AtlasLineageInfo() {}
@@ -62,11 +67,18 @@ public class AtlasLineageInfo implements Serializable {
*/
public AtlasLineageInfo(String baseEntityGuid, Map<String,
AtlasEntityHeader> guidEntityMap,
Set<LineageRelation> relations, LineageDirection
lineageDirection, int lineageDepth) {
- this.baseEntityGuid = baseEntityGuid;
- this.lineageDirection = lineageDirection;
- this.lineageDepth = lineageDepth;
- this.guidEntityMap = guidEntityMap;
- this.relations = relations;
+ this(baseEntityGuid, guidEntityMap, relations, null, null,
lineageDirection, lineageDepth);
+ }
+
+ public AtlasLineageInfo(String baseEntityGuid, Map<String,
AtlasEntityHeader> guidEntityMap,
+ Set<LineageRelation> relations, Set<String>
visitedEdges, Map<String, LineageInfoOnDemand> relationsOnDemand,
LineageDirection lineageDirection, int lineageDepth) {
+ this.baseEntityGuid = baseEntityGuid;
+ this.lineageDirection = lineageDirection;
+ this.lineageDepth = lineageDepth;
+ this.guidEntityMap = guidEntityMap;
+ this.relations = relations;
+ this.visitedEdges = visitedEdges;
+ this.relationsOnDemand = relationsOnDemand;
}
public String getBaseEntityGuid() {
@@ -93,6 +105,22 @@ public class AtlasLineageInfo implements Serializable {
this.relations = relations;
}
+ public Set<String> getVisitedEdges() {
+ return visitedEdges;
+ }
+
+ public void setVisitedEdges(Set<String> visitedEdges) {
+ this.visitedEdges = visitedEdges;
+ }
+
+ public Map<String, LineageInfoOnDemand> getRelationsOnDemand() {
+ return relationsOnDemand;
+ }
+
+ public void setRelationsOnDemand(Map<String, LineageInfoOnDemand>
relationsOnDemand) {
+ this.relationsOnDemand = relationsOnDemand;
+ }
+
public LineageDirection getLineageDirection() {
return lineageDirection;
}
@@ -109,6 +137,14 @@ public class AtlasLineageInfo implements Serializable {
this.lineageDepth = lineageDepth;
}
+ public Map<String, LineageOnDemandConstraints> getLineageOnDemandPayload()
{
+ return lineageOnDemandPayload;
+ }
+
+ public void setLineageOnDemandPayload(Map<String,
LineageOnDemandConstraints> lineageOnDemandPayload) {
+ this.lineageOnDemandPayload = lineageOnDemandPayload;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -132,6 +168,7 @@ public class AtlasLineageInfo implements Serializable {
"baseEntityGuid=" + baseEntityGuid +
", guidEntityMap=" + guidEntityMap +
", relations=" + relations +
+ ", relationsOnDemand=" + relationsOnDemand +
", lineageDirection=" + lineageDirection +
", lineageDepth=" + lineageDepth +
'}';
@@ -204,4 +241,120 @@ public class AtlasLineageInfo implements Serializable {
}
}
+ @JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility =
PUBLIC_ONLY, fieldVisibility = NONE)
+ @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+ @JsonIgnoreProperties(ignoreUnknown = true, value =
{"inputRelationsReachedLimit", "outputRelationsReachedLimit"})
+ @XmlRootElement
+ @XmlAccessorType(XmlAccessType.PROPERTY)
+ public static class LineageInfoOnDemand {
+ @JsonProperty
+ boolean hasMoreInputs;
+ @JsonProperty
+ boolean hasMoreOutputs;
+ int inputRelationsCount;
+ int outputRelationsCount;
+ boolean isInputRelationsReachedLimit;
+ boolean isOutputRelationsReachedLimit;
+ LineageOnDemandConstraints onDemandConstraints;
+
+ public LineageInfoOnDemand() { }
+
+ public LineageInfoOnDemand(LineageOnDemandConstraints
onDemandConstraints) {
+ this.onDemandConstraints = onDemandConstraints;
+ this.hasMoreInputs = false;
+ this.hasMoreOutputs = false;
+ this.inputRelationsCount = 0;
+ this.outputRelationsCount = 0;
+ this.isInputRelationsReachedLimit = false;
+ this.isOutputRelationsReachedLimit = false;
+ }
+
+ public boolean isInputRelationsReachedLimit() {
+ return isInputRelationsReachedLimit;
+ }
+
+ public void setInputRelationsReachedLimit(boolean
inputRelationsReachedLimit) {
+ isInputRelationsReachedLimit = inputRelationsReachedLimit;
+ }
+
+ public boolean isOutputRelationsReachedLimit() {
+ return isOutputRelationsReachedLimit;
+ }
+
+ public void setOutputRelationsReachedLimit(boolean
outputRelationsReachedLimit) {
+ isOutputRelationsReachedLimit = outputRelationsReachedLimit;
+ }
+
+ public boolean hasMoreInputs() {
+ return hasMoreInputs;
+ }
+
+ public void setHasMoreInputs(boolean hasMoreInputs) {
+ this.hasMoreInputs = hasMoreInputs;
+ }
+
+ public boolean hasMoreOutputs() {
+ return hasMoreOutputs;
+ }
+
+ public void setHasMoreOutputs(boolean hasMoreOutputs) {
+ this.hasMoreOutputs = hasMoreOutputs;
+ }
+
+ public int getInputRelationsCount() {
+ return inputRelationsCount;
+ }
+
+ public void incrementInputRelationsCount() {
+ if (hasMoreInputs) {
+ return;
+ }
+
+ if (isInputRelationsReachedLimit) {
+ setHasMoreInputs(true);
+ return;
+ }
+
+ this.inputRelationsCount++;
+
+ if (inputRelationsCount ==
onDemandConstraints.getInputRelationsLimit()) {
+ this.setInputRelationsReachedLimit(true);
+ return;
+ }
+ }
+
+ public int getOutputRelationsCount() {
+ return outputRelationsCount;
+ }
+
+ public void incrementOutputRelationsCount() {
+ if (hasMoreOutputs) {
+ return;
+ }
+
+ if (isOutputRelationsReachedLimit) {
+ setHasMoreOutputs(true);
+ return;
+ }
+
+ this.outputRelationsCount++;
+
+ if (outputRelationsCount ==
onDemandConstraints.getOutputRelationsLimit()) {
+ this.setOutputRelationsReachedLimit(true);
+ return;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "LineageInfoOnDemand{" +
+ "hasMoreInputs='" + hasMoreInputs + '\'' +
+ ", hasMoreOutputs='" + hasMoreOutputs + '\'' +
+ ", inputRelationsCount='" + inputRelationsCount + '\'' +
+ ", outputRelationsCount='" + outputRelationsCount + '\'' +
+ '}';
+ }
+
+ }
+
}
\ No newline at end of file
diff --git
a/intg/src/main/java/org/apache/atlas/model/lineage/LineageOnDemandConstraints.java
b/intg/src/main/java/org/apache/atlas/model/lineage/LineageOnDemandConstraints.java
new file mode 100644
index 000000000..0b3ea2418
--- /dev/null
+++
b/intg/src/main/java/org/apache/atlas/model/lineage/LineageOnDemandConstraints.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.lineage;
+
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
+
+import java.io.Serializable;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility =
PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+/**
+ * This is the root class representing the input for lineage search on-demand.
+ */
+public class LineageOnDemandConstraints implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private LineageDirection direction;
+ private int inputRelationsLimit;
+ private int outputRelationsLimit;
+ private int depth;
+
+ private static final int LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT =
AtlasConfiguration.LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT.getInt();
+ private static final int LINEAGE_ON_DEMAND_DEFAULT_DEPTH = 3;
+
+ public LineageOnDemandConstraints() {
+ this(LineageDirection.BOTH, LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT,
LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT, LINEAGE_ON_DEMAND_DEFAULT_DEPTH);
+ }
+
+ public LineageOnDemandConstraints(LineageDirection direction, int
inputRelationsLimit, int outputRelationsLimit, int depth) {
+ this.direction = direction;
+ this.inputRelationsLimit = inputRelationsLimit;
+ this.outputRelationsLimit = outputRelationsLimit;
+ this.depth = depth;
+ }
+
+ public LineageDirection getDirection() {
+ return direction;
+ }
+
+ public void setDirection(LineageDirection direction) {
+ this.direction = direction;
+ }
+
+ public int getInputRelationsLimit() {
+ return inputRelationsLimit;
+ }
+
+ public void setInputRelationsLimit(int inputRelationsLimit) {
+ this.inputRelationsLimit = inputRelationsLimit;
+ }
+
+ public int getOutputRelationsLimit() {
+ return outputRelationsLimit;
+ }
+
+ public void setOutputRelationsLimit(int outputRelationsLimit) {
+ this.outputRelationsLimit = outputRelationsLimit;
+ }
+
+ public int getDepth() {
+ return depth;
+ }
+
+ public void setDepth(int depth) {
+ this.depth = depth;
+ }
+
+}
diff --git a/intg/src/test/resources/atlas-application.properties
b/intg/src/test/resources/atlas-application.properties
index 2139cec24..1156abcca 100644
--- a/intg/src/test/resources/atlas-application.properties
+++ b/intg/src/test/resources/atlas-application.properties
@@ -146,3 +146,6 @@ atlas.search.gremlin.enable=true
######### Configure use of Tasks #########
atlas.tasks.enabled=false
+
+######### Configure on-demand lineage #########
+atlas.lineage.on.demand.enabled=true
diff --git
a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java
b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java
index 8dc6d3a19..0b518b904 100644
---
a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java
+++
b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java
@@ -22,8 +22,11 @@ package org.apache.atlas.discovery;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
+import org.apache.atlas.model.lineage.LineageOnDemandConstraints;
import org.apache.atlas.v1.model.lineage.SchemaResponse.SchemaDetails;
+import java.util.Map;
+
public interface AtlasLineageService {
/**
* @param entityGuid unique ID of the entity
@@ -33,6 +36,13 @@ public interface AtlasLineageService {
*/
AtlasLineageInfo getAtlasLineageInfo(String entityGuid, LineageDirection
direction, int depth) throws AtlasBaseException;
+ /**
+ * @param entityGuid unique ID of the entity
+ * @param lineageOnDemandConstraintsByGuid map of constraints to fetch
lineage for each guid
+ * @return AtlasLineageInfo
+ */
+ AtlasLineageInfo getAtlasLineageInfo(String entityGuid, Map<String,
LineageOnDemandConstraints> lineageOnDemandConstraintsByGuid) throws
AtlasBaseException;
+
/**
* Return the schema for the given datasetName.
*
diff --git
a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
index d18ace841..aa881f2bd 100644
---
a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
+++
b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java
@@ -31,8 +31,10 @@ import
org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
+import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageInfoOnDemand;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation;
+import org.apache.atlas.model.lineage.LineageOnDemandConstraints;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
@@ -59,6 +61,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -80,10 +83,14 @@ import static
org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.
public class EntityLineageService implements AtlasLineageService {
private static final Logger LOG =
LoggerFactory.getLogger(EntityLineageService.class);
- private static final String PROCESS_INPUTS_EDGE = "__Process.inputs";
- private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs";
- private static final String COLUMNS = "columns";
- private static final boolean LINEAGE_USING_GREMLIN =
AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean();
+ private static final String PROCESS_INPUTS_EDGE =
"__Process.inputs";
+ private static final String PROCESS_OUTPUTS_EDGE =
"__Process.outputs";
+ private static final String COLUMNS =
"columns";
+ private static final boolean LINEAGE_USING_GREMLIN =
AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean();
+ private static final Integer DEFAULT_LINEAGE_MAX_NODE_COUNT = 9000;
+ private static final int LINEAGE_ON_DEMAND_DEFAULT_DEPTH = 3;
+ private static final int LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT =
AtlasConfiguration.LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT.getInt();
+ private static final String SEPARATOR = "->";
private final AtlasGraph graph;
private final AtlasGremlinQueryProvider gremlinQueryProvider;
@@ -103,6 +110,40 @@ public class EntityLineageService implements
AtlasLineageService {
public AtlasLineageInfo getAtlasLineageInfo(String guid, LineageDirection
direction, int depth) throws AtlasBaseException {
AtlasLineageInfo ret;
+ boolean isDataSet = validateEntityTypeAndCheckIfDataSet(guid);
+
+ if (LINEAGE_USING_GREMLIN) {
+ ret = getLineageInfoV1(guid, direction, depth, isDataSet);
+ } else {
+ ret = getLineageInfoV2(guid, direction, depth, isDataSet);
+ }
+
+ return ret;
+ }
+
+ @Override
+ @GraphTransaction
+ public AtlasLineageInfo getAtlasLineageInfo(String guid, Map<String,
LineageOnDemandConstraints> lineageConstraintsMap) throws AtlasBaseException {
+ AtlasLineageInfo ret;
+
+ if (MapUtils.isEmpty(lineageConstraintsMap)) {
+ lineageConstraintsMap = new HashMap<>();
+ lineageConstraintsMap.put(guid,
getDefaultLineageConstraints(guid));
+ }
+
+ boolean isDataSet = validateEntityTypeAndCheckIfDataSet(guid);
+
+ ret = getLineageInfoOnDemand(guid, lineageConstraintsMap, isDataSet);
+
+ appendLineageOnDemandPayload(ret, lineageConstraintsMap);
+
+ // filtering out on-demand relations which has input & output nodes
within the limit
+ cleanupRelationsOnDemand(ret);
+
+ return ret;
+ }
+
+ private boolean validateEntityTypeAndCheckIfDataSet(String guid) throws
AtlasBaseException {
AtlasEntityHeader entity =
entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
AtlasAuthorizationUtils.verifyAccess(new
AtlasEntityAccessRequest(atlasTypeRegistry, AtlasPrivilege.ENTITY_READ,
entity), "read entity lineage: guid=", guid);
@@ -123,13 +164,21 @@ public class EntityLineageService implements
AtlasLineageService {
}
}
- if (LINEAGE_USING_GREMLIN) {
- ret = getLineageInfoV1(guid, direction, depth, isDataSet);
- } else {
- ret = getLineageInfoV2(guid, direction, depth, isDataSet);
+ return isDataSet;
+ }
+
+ private void appendLineageOnDemandPayload(AtlasLineageInfo lineageInfo,
Map<String, LineageOnDemandConstraints> lineageConstraintsMap) {
+ if (lineageInfo == null || MapUtils.isEmpty(lineageConstraintsMap)) {
+ return;
}
+ lineageInfo.setLineageOnDemandPayload(lineageConstraintsMap);
+ }
- return ret;
+ //Consider only relationsOnDemand which has either more inputs or more
outputs than given limit
+ private void cleanupRelationsOnDemand(AtlasLineageInfo lineageInfo) {
+ if (lineageInfo != null &&
MapUtils.isNotEmpty(lineageInfo.getRelationsOnDemand())) {
+ lineageInfo.getRelationsOnDemand().entrySet().removeIf(x ->
!(x.getValue().hasMoreInputs() || x.getValue().hasMoreOutputs()));
+ }
}
@Override
@@ -300,6 +349,102 @@ public class EntityLineageService implements
AtlasLineageService {
return ret;
}
+ private LineageOnDemandConstraints getDefaultLineageConstraints(String
guid) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No lineage on-demand constraints provided for guid: {},
configuring with default values direction: {}, inputRelationsLimit: {},
outputRelationsLimit: {}, depth: {}",
+ guid, BOTH, LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT,
LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT, LINEAGE_ON_DEMAND_DEFAULT_DEPTH);
+ }
+
+ return new LineageOnDemandConstraints();
+ }
+
+ private LineageOnDemandConstraints
getAndValidateLineageConstraintsByGuid(String guid, Map<String,
LineageOnDemandConstraints> lineageConstraintsMap) {
+
+ if (lineageConstraintsMap == null ||
!lineageConstraintsMap.containsKey(guid)) {
+ return getDefaultLineageConstraints(guid);
+ }
+
+ LineageOnDemandConstraints lineageConstraintsByGuid =
lineageConstraintsMap.get(guid);;
+ if (lineageConstraintsByGuid == null) {
+ return getDefaultLineageConstraints(guid);
+ }
+
+ if (Objects.isNull(lineageConstraintsByGuid.getDirection())) {
+ LOG.info("No lineage on-demand direction provided for guid: {},
configuring with default value {}", guid, LineageDirection.BOTH);
+ lineageConstraintsByGuid.setDirection(BOTH);
+ }
+
+ if (lineageConstraintsByGuid.getInputRelationsLimit() == 0) {
+ LOG.info("No lineage on-demand inputRelationsLimit provided for
guid: {}, configuring with default value {}", guid,
LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT);
+
lineageConstraintsByGuid.setInputRelationsLimit(LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT);
+ }
+
+ if (lineageConstraintsByGuid.getOutputRelationsLimit() == 0) {
+ LOG.info("No lineage on-demand outputRelationsLimit provided for
guid: {}, configuring with default value {}", guid,
LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT);
+
lineageConstraintsByGuid.setOutputRelationsLimit(LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT);
+ }
+
+ if (lineageConstraintsByGuid.getDepth() == 0) {
+ LOG.info("No lineage on-demand depth provided for guid: {},
configuring with default value {}", guid, LINEAGE_ON_DEMAND_DEFAULT_DEPTH);
+ lineageConstraintsByGuid.setDepth(LINEAGE_ON_DEMAND_DEFAULT_DEPTH);
+ }
+
+ return lineageConstraintsByGuid;
+
+ }
+
+ private AtlasLineageInfo getLineageInfoOnDemand(String guid, Map<String,
LineageOnDemandConstraints> lineageConstraintsMap, boolean isDataSet) throws
AtlasBaseException {
+
+ LineageOnDemandConstraints lineageConstraintsByGuid =
getAndValidateLineageConstraintsByGuid(guid, lineageConstraintsMap);
+
+ if (lineageConstraintsByGuid == null) {
+ throw new
AtlasBaseException(AtlasErrorCode.NO_LINEAGE_CONSTRAINTS_FOR_GUID, guid);
+ }
+
+ LineageDirection direction = lineageConstraintsByGuid.getDirection();
+ int depth = lineageConstraintsByGuid.getDepth();
+
+ AtlasLineageInfo ret = initializeLineageInfo(guid, direction, depth);
+
+ if (depth == 0) {
+ depth = -1;
+ }
+
+ if (isDataSet) {
+ AtlasVertex datasetVertex =
AtlasGraphUtilsV2.findByGuid(this.graph, guid);
+
+ if (!ret.getRelationsOnDemand().containsKey(guid)) {
+ ret.getRelationsOnDemand().put(guid, new
LineageInfoOnDemand(lineageConstraintsByGuid));
+ }
+
+ if (direction == INPUT || direction == BOTH) {
+ traverseEdgesOnDemand(datasetVertex, true, depth, new
HashSet<>(), lineageConstraintsMap, ret);
+ }
+
+ if (direction == OUTPUT || direction == BOTH) {
+ traverseEdgesOnDemand(datasetVertex, false, depth, new
HashSet<>(), lineageConstraintsMap, ret);
+ }
+ } else {
+ AtlasVertex processVertex =
AtlasGraphUtilsV2.findByGuid(this.graph, guid);
+
+ // make one hop to the next dataset vertices from process vertex
and traverse with 'depth = depth - 1'
+ if (direction == INPUT || direction == BOTH) {
+ Iterable<AtlasEdge> processEdges =
processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE);
+
+ traverseEdgesOnDemand(processEdges, true, depth,
lineageConstraintsMap, ret);
+ }
+
+ if (direction == OUTPUT || direction == BOTH) {
+ Iterable<AtlasEdge> processEdges =
processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE);
+
+ traverseEdgesOnDemand(processEdges, false, depth,
lineageConstraintsMap, ret);
+ }
+
+ }
+
+ return ret;
+ }
+
private void traverseEdges(AtlasVertex datasetVertex, boolean isInput, int
depth, AtlasLineageInfo ret) throws AtlasBaseException {
traverseEdges(datasetVertex, isInput, depth, new HashSet<>(), ret);
}
@@ -331,24 +476,139 @@ public class EntityLineageService implements
AtlasLineageService {
}
}
+ private void traverseEdgesOnDemand(Iterable<AtlasEdge> processEdges,
boolean isInput, int depth, Map<String, LineageOnDemandConstraints>
lineageConstraintsMap, AtlasLineageInfo ret) throws AtlasBaseException {
+ for (AtlasEdge processEdge : processEdges) {
+ boolean isInputEdge =
processEdge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE);
+
+ if (incrementAndCheckIfRelationsLimitReached(processEdge,
isInputEdge, lineageConstraintsMap, ret)) {
+ break;
+ } else {
+ addEdgeToResult(processEdge, ret);
+ }
+
+ AtlasVertex datasetVertex = processEdge.getInVertex();
+
+ String inGuid = AtlasGraphUtilsV2.getIdFromVertex(datasetVertex);
+ LineageOnDemandConstraints inGuidLineageConstrains =
getAndValidateLineageConstraintsByGuid(inGuid, lineageConstraintsMap);
+
+ if (!ret.getRelationsOnDemand().containsKey(inGuid)) {
+ ret.getRelationsOnDemand().put(inGuid, new
LineageInfoOnDemand(inGuidLineageConstrains));
+ }
+
+ traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, new
HashSet<>(), lineageConstraintsMap, ret);
+ }
+ }
+
+ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean
isInput, int depth, Set<String> visitedVertices, Map<String,
LineageOnDemandConstraints> lineageConstraintsMap, AtlasLineageInfo ret) throws
AtlasBaseException {
+ if (depth != 0) {
+ // keep track of visited vertices to avoid circular loop
+ visitedVertices.add(getId(datasetVertex));
+
+ Iterable<AtlasEdge> incomingEdges = datasetVertex.getEdges(IN,
isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE);
+
+ for (AtlasEdge incomingEdge : incomingEdges) {
+
+ if (incrementAndCheckIfRelationsLimitReached(incomingEdge,
!isInput, lineageConstraintsMap, ret)) {
+ break;
+ } else {
+ addEdgeToResult(incomingEdge, ret);
+ }
+
+ AtlasVertex processVertex =
incomingEdge.getOutVertex();
+ Iterable<AtlasEdge> outgoingEdges =
processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE :
PROCESS_OUTPUTS_EDGE);
+
+ for (AtlasEdge outgoingEdge : outgoingEdges) {
+
+ if (incrementAndCheckIfRelationsLimitReached(outgoingEdge,
isInput, lineageConstraintsMap, ret)) {
+ break;
+ } else {
+ addEdgeToResult(outgoingEdge, ret);
+ }
+
+ AtlasVertex entityVertex = outgoingEdge.getInVertex();
+
+ if (entityVertex != null &&
!visitedVertices.contains(getId(entityVertex))) {
+ traverseEdgesOnDemand(entityVertex, isInput, depth -
1, visitedVertices, lineageConstraintsMap, ret);
+ }
+ }
+ }
+ }
+ }
+
+ private boolean incrementAndCheckIfRelationsLimitReached(AtlasEdge
atlasEdge, boolean isInput, Map<String, LineageOnDemandConstraints>
lineageConstraintsMap, AtlasLineageInfo ret) {
+
+ if (lineageContainsEdge(ret, atlasEdge)) {
+ return false;
+ }
+
+ boolean hasRelationsLimitReached = false;
+
+ AtlasVertex inVertex = isInput ?
atlasEdge.getOutVertex() : atlasEdge.getInVertex();
+ String inGuid =
AtlasGraphUtilsV2.getIdFromVertex(inVertex);
+ LineageOnDemandConstraints inGuidLineageConstraints =
getAndValidateLineageConstraintsByGuid(inGuid, lineageConstraintsMap);
+
+ AtlasVertex outVertex = isInput ?
atlasEdge.getInVertex() : atlasEdge.getOutVertex();
+ String outGuid =
AtlasGraphUtilsV2.getIdFromVertex(outVertex);
+ LineageOnDemandConstraints outGuidLineageConstraints =
getAndValidateLineageConstraintsByGuid(outGuid, lineageConstraintsMap);
+
+ LineageInfoOnDemand inLineageInfo =
ret.getRelationsOnDemand().containsKey(inGuid) ?
ret.getRelationsOnDemand().get(inGuid) : new
LineageInfoOnDemand(inGuidLineageConstraints);
+ LineageInfoOnDemand outLineageInfo =
ret.getRelationsOnDemand().containsKey(outGuid) ?
ret.getRelationsOnDemand().get(outGuid) : new
LineageInfoOnDemand(outGuidLineageConstraints);
+
+ if (inLineageInfo.isInputRelationsReachedLimit()) {
+ inLineageInfo.setHasMoreInputs(true);
+ hasRelationsLimitReached = true;
+ }else {
+ inLineageInfo.incrementInputRelationsCount();
+ }
+
+ if (outLineageInfo.isOutputRelationsReachedLimit()) {
+ outLineageInfo.setHasMoreOutputs(true);
+ hasRelationsLimitReached = true;
+ } else {
+ outLineageInfo.incrementOutputRelationsCount();
+ }
+
+ if (!hasRelationsLimitReached) {
+ ret.getRelationsOnDemand().put(inGuid, inLineageInfo);
+ ret.getRelationsOnDemand().put(outGuid, outLineageInfo);
+ }
+
+ return hasRelationsLimitReached;
+ }
+
private void addEdgeToResult(AtlasEdge edge, AtlasLineageInfo lineageInfo)
throws AtlasBaseException {
- if (!lineageContainsEdge(lineageInfo, edge)) {
+ if (!lineageContainsEdge(lineageInfo, edge) &&
!lineageMaxNodeCountReached(lineageInfo.getRelations())) {
processEdge(edge, lineageInfo);
}
}
+ private int getLineageMaxNodeAllowedCount() {
+ return Math.min(DEFAULT_LINEAGE_MAX_NODE_COUNT,
AtlasConfiguration.LINEAGE_MAX_NODE_COUNT.getInt());
+ }
+
+ private boolean lineageMaxNodeCountReached(Set<LineageRelation> relations)
{
+ return CollectionUtils.isNotEmpty(relations) && relations.size() >
getLineageMaxNodeAllowedCount();
+ }
+
+ private String getVisitedEdgeLabel(String inGuid, String outGuid, String
relationGuid) {
+ if (isLineageOnDemandEnabled()) {
+ return inGuid + SEPARATOR + outGuid;
+ }
+ return relationGuid;
+ }
+
private boolean lineageContainsEdge(AtlasLineageInfo lineageInfo,
AtlasEdge edge) {
boolean ret = false;
- if (lineageInfo != null &&
CollectionUtils.isNotEmpty(lineageInfo.getRelations()) && edge != null) {
- String relationGuid =
AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY,
String.class);
- Set<LineageRelation> relations = lineageInfo.getRelations();
+ if (edge != null && lineageInfo != null &&
CollectionUtils.isNotEmpty(lineageInfo.getVisitedEdges())) {
+ String inGuid =
AtlasGraphUtilsV2.getIdFromVertex(edge.getInVertex());
+ String outGuid =
AtlasGraphUtilsV2.getIdFromVertex(edge.getOutVertex());
+ String relationGuid =
AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY,
String.class);
+ boolean isInputEdge =
edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE);
+ String visitedEdgeLabel = isInputEdge ?
getVisitedEdgeLabel(inGuid, outGuid, relationGuid) :
getVisitedEdgeLabel(outGuid, inGuid, relationGuid);
- for (LineageRelation relation : relations) {
- if (relation.getRelationshipId().equals(relationGuid)) {
- ret = true;
- break;
- }
+ if (lineageInfo.getVisitedEdges().contains(visitedEdgeLabel)) {
+ ret = true;
}
}
@@ -356,11 +616,11 @@ public class EntityLineageService implements
AtlasLineageService {
}
private void processEdge(final AtlasEdge edge, final AtlasLineageInfo
lineageInfo) throws AtlasBaseException {
- processEdge(edge, lineageInfo.getGuidEntityMap(),
lineageInfo.getRelations());
+ processEdge(edge, lineageInfo.getGuidEntityMap(),
lineageInfo.getRelations(), lineageInfo.getVisitedEdges());
}
private AtlasLineageInfo initializeLineageInfo(String guid,
LineageDirection direction, int depth) {
- return new AtlasLineageInfo(guid, new HashMap<>(), new HashSet<>(),
direction, depth);
+ return new AtlasLineageInfo(guid, new HashMap<>(), new HashSet<>(),
new HashSet<>(), new HashMap<>(), direction, depth);
}
private static String getId(AtlasVertex vertex) {
@@ -383,6 +643,10 @@ public class EntityLineageService implements
AtlasLineageService {
}
private void processEdge(final AtlasEdge edge, final Map<String,
AtlasEntityHeader> entities, final Set<LineageRelation> relations) throws
AtlasBaseException {
+ processEdge(edge, entities, relations, null);
+ }
+
+ private void processEdge(final AtlasEdge edge, final Map<String,
AtlasEntityHeader> entities, final Set<LineageRelation> relations, final
Set<String> visitedEdges) throws AtlasBaseException {
AtlasVertex inVertex = edge.getInVertex();
AtlasVertex outVertex = edge.getOutVertex();
String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex);
@@ -390,6 +654,7 @@ public class EntityLineageService implements
AtlasLineageService {
String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge,
RELATIONSHIP_GUID_PROPERTY_KEY, String.class);
boolean isInputEdge =
edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE);
+
if (!entities.containsKey(inGuid)) {
AtlasEntityHeader entityHeader =
entityRetriever.toAtlasEntityHeader(inVertex);
entities.put(inGuid, entityHeader);
@@ -405,6 +670,11 @@ public class EntityLineageService implements
AtlasLineageService {
} else {
relations.add(new LineageRelation(outGuid, inGuid, relationGuid));
}
+
+ if (visitedEdges != null) {
+ String visitedEdgeLabel = isInputEdge ?
getVisitedEdgeLabel(inGuid, outGuid, relationGuid) :
getVisitedEdgeLabel(outGuid, inGuid, relationGuid);
+ visitedEdges.add(visitedEdgeLabel);
+ }
}
private AtlasLineageInfo getBothLineageInfoV1(String guid, int depth,
boolean isDataSet) throws AtlasBaseException {
@@ -448,4 +718,8 @@ public class EntityLineageService implements
AtlasLineageService {
return ret;
}
+
+ public boolean isLineageOnDemandEnabled() {
+ return AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getBoolean();
+ }
}
\ No newline at end of file
diff --git
a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index d55ada77e..b19095b48 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -184,6 +184,8 @@ public class AdminResource {
private final AtlasDebugMetricsSink debugMetricsRESTSink;
private final boolean isDebugMetricsEnabled;
private final boolean isTasksEnabled;
+ private final boolean isOnDemandLineageEnabled;
+ private final int defaultLineageNodeCount;
static {
try {
@@ -224,12 +226,16 @@ public class AdminResource {
this.uiDateFormat = atlasProperties.getString(UI_DATE_FORMAT,
UI_DATE_DEFAULT_FORMAT);
this.isDebugMetricsEnabled =
AtlasConfiguration.DEBUG_METRICS_ENABLED.getBoolean();
this.isTasksEnabled =
AtlasConfiguration.TASKS_USE_ENABLED.getBoolean();
+ this.isOnDemandLineageEnabled =
AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getBoolean();
+ this.defaultLineageNodeCount =
AtlasConfiguration.LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT.getInt();
} else {
this.defaultUIVersion = UI_VERSION_V2;
this.isTimezoneFormatEnabled = true;
this.uiDateFormat = UI_DATE_DEFAULT_FORMAT;
this.isDebugMetricsEnabled = false;
this.isTasksEnabled = false;
+ this.isOnDemandLineageEnabled = false;
+ this.defaultLineageNodeCount = 3;
}
}
@@ -379,6 +385,8 @@ public class AdminResource {
responseData.put(UI_DATE_FORMAT, uiDateFormat);
responseData.put(AtlasConfiguration.DEBUG_METRICS_ENABLED.getPropertyName(),
isDebugMetricsEnabled);
responseData.put(AtlasConfiguration.TASKS_USE_ENABLED.getPropertyName(),
isTasksEnabled);
+
responseData.put(AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getPropertyName(),
isOnDemandLineageEnabled);
+
responseData.put(AtlasConfiguration.LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT.getPropertyName(),
defaultLineageNodeCount);
if (AtlasConfiguration.SESSION_TIMEOUT_SECS.getInt() != -1) {
responseData.put(AtlasConfiguration.SESSION_TIMEOUT_SECS.getPropertyName(),
AtlasConfiguration.SESSION_TIMEOUT_SECS.getInt());
diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java
b/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java
index 30b562607..c66200c9b 100644
--- a/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java
@@ -19,6 +19,7 @@
package org.apache.atlas.web.rest;
+import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.annotation.Timed;
import org.apache.atlas.discovery.AtlasLineageService;
@@ -26,6 +27,7 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
+import org.apache.atlas.model.lineage.LineageOnDemandConstraints;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
@@ -33,6 +35,7 @@ import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.web.util.Servlets;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.inject.Inject;
@@ -43,6 +46,7 @@ import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
+import javax.ws.rs.POST;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
@@ -59,13 +63,15 @@ import java.util.Map;
@Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
@Produces({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
public class LineageREST {
+ private static final Logger LOG =
LoggerFactory.getLogger(LineageREST.class);
private static final Logger PERF_LOG =
AtlasPerfTracer.getPerfLogger("rest.LineageREST");
private static final String PREFIX_ATTR = "attr:";
private final AtlasTypeRegistry typeRegistry;
private final AtlasLineageService atlasLineageService;
- private static final String DEFAULT_DIRECTION = "BOTH";
- private static final String DEFAULT_DEPTH = "3";
+
+ private static final String DEFAULT_DIRECTION = "BOTH";
+ private static final String DEFAULT_DEPTH = "3";
@Context
private HttpServletRequest httpServletRequest;
@@ -109,6 +115,42 @@ public class LineageREST {
}
}
+ /**
+ * Returns lineage info about entity.
+ * @return AtlasLineageInfo
+ * @throws AtlasBaseException
+ * @HTTP 200 If Lineage exists for the given entity
+ * @HTTP 400 Bad query parameters
+ * @HTTP 404 If no lineage is found for the given entity
+ */
+ @POST
+ @Path("/{guid}")
+ @Consumes(Servlets.JSON_MEDIA_TYPE)
+ @Produces(Servlets.JSON_MEDIA_TYPE)
+ @Timed
+ public AtlasLineageInfo getLineageGraph(@PathParam("guid") String guid,
+ Map<String,
LineageOnDemandConstraints> lineageConstraintsMapByGuid) throws
AtlasBaseException {
+ if (!AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getBoolean()) {
+ LOG.warn("LineageREST: "+
AtlasErrorCode.LINEAGE_ON_DEMAND_NOT_ENABLED.getFormattedErrorMessage(AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getPropertyName()));
+
+ throw new
AtlasBaseException(AtlasErrorCode.LINEAGE_ON_DEMAND_NOT_ENABLED,
AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getPropertyName());
+ }
+
+ Servlets.validateQueryParamLength("guid", guid);
+
+ AtlasPerfTracer perf = null;
+
+ try {
+ if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+ perf = AtlasPerfTracer.getPerfTracer(PERF_LOG,
"LineageREST.getLineageGraph(" + guid + "," + lineageConstraintsMapByGuid +
")");
+ }
+
+ return atlasLineageService.getAtlasLineageInfo(guid,
lineageConstraintsMapByGuid);
+ } finally {
+ AtlasPerfTracer.log(perf);
+ }
+ }
+
/**
* Returns lineage info about entity.
*
diff --git
a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
index 9d5a9e5c3..c2ccec795 100755
--- a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
@@ -104,6 +104,7 @@ public abstract class BaseResourceIT {
public static final String SEC_TAG = "sec_Tag";
public static final String FINANCE_TAG = "finance_Tag";
public static final String CLASSIFICATION = "classification";
+ public static final String ATLAS_LINEAGE_ON_DEMAND_ENABLED =
"atlas.lineage.on.demand.enabled";
protected static final int MAX_WAIT_TIME = 60000;
@@ -111,6 +112,7 @@ public abstract class BaseResourceIT {
protected AtlasClient atlasClientV1;
protected AtlasClientV2 atlasClientV2;
protected String[] atlasUrls;
+ protected boolean isLineageOnDemandEnabled;
protected NotificationInterface notificationInterface = null;
@@ -123,11 +125,16 @@ public abstract class BaseResourceIT {
ApplicationProperties.get().setProperty("atlas.client.readTimeoutMSecs",
"100000000");
ApplicationProperties.get().setProperty("atlas.client.connectTimeoutMSecs",
"100000000");
-
Configuration configuration = ApplicationProperties.get();
+ isLineageOnDemandEnabled =
configuration.getBoolean(ATLAS_LINEAGE_ON_DEMAND_ENABLED);
+ if (!isLineageOnDemandEnabled) {
+
ApplicationProperties.get().setProperty(ATLAS_LINEAGE_ON_DEMAND_ENABLED, true);
+ }
+
atlasUrls = configuration.getStringArray(ATLAS_REST_ADDRESS);
+
if (atlasUrls == null || atlasUrls.length == 0) {
atlasUrls = new String[] { "http://localhost:21000/" };
}
diff --git
a/webapp/src/test/java/org/apache/atlas/web/integration/LineageClientV2IT.java
b/webapp/src/test/java/org/apache/atlas/web/integration/LineageClientV2IT.java
index e7b67f21e..e2c86bcc4 100644
---
a/webapp/src/test/java/org/apache/atlas/web/integration/LineageClientV2IT.java
+++
b/webapp/src/test/java/org/apache/atlas/web/integration/LineageClientV2IT.java
@@ -18,8 +18,12 @@
package org.apache.atlas.web.integration;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
+import org.apache.atlas.model.lineage.LineageOnDemandConstraints;
import org.apache.atlas.v1.model.instance.Id;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.slf4j.Logger;
@@ -44,6 +48,7 @@ public class LineageClientV2IT extends
DataSetLineageJerseyResourceIT {
private static final Logger LOG =
LoggerFactory.getLogger(LineageClientV2IT.class);
private String salesFactTable;
private String salesMonthlyTable;
+ private String salesMonthlyTableOnDemand;
private String salesDBName;
@BeforeClass
@@ -74,6 +79,67 @@ public class LineageClientV2IT extends
DataSetLineageJerseyResourceIT {
Assert.assertEquals(inputLineageInfo.getBaseEntityGuid(), tableId);
}
+ @Test
+ public void testGetLineageInfoOnDemand() throws Exception {
+ String tableId = atlasClientV1.getEntity(HIVE_TABLE_TYPE,
+ REFERENCEABLE_ATTRIBUTE_NAME,
salesMonthlyTableOnDemand).getId()._getId();
+
+ //Get entire Lineage Info
+ AtlasLineageInfo inputLineageInfo =
atlasClientV2.getLineageInfo(tableId, AtlasLineageInfo.LineageDirection.INPUT,
5);
+ Assert.assertNotNull(inputLineageInfo);
+ Map<String, AtlasEntityHeader> entities =
inputLineageInfo.getGuidEntityMap();
+ Assert.assertNotNull(entities);
+
+ Set<AtlasLineageInfo.LineageRelation> relations =
inputLineageInfo.getRelations();
+ Assert.assertNotNull(relations);
+
+ Map<String, AtlasLineageInfo.LineageInfoOnDemand> relationsOnDemand =
inputLineageInfo.getRelationsOnDemand();
+ Assert.assertTrue(relationsOnDemand == null ||
relationsOnDemand.size() == 0);
+
+ Assert.assertEquals(entities.size(), 21);
+ Assert.assertEquals(relations.size(), 20);
+ Assert.assertEquals(inputLineageInfo.getLineageDirection(),
AtlasLineageInfo.LineageDirection.INPUT);
+ Assert.assertEquals(inputLineageInfo.getLineageDepth(), 5);
+ Assert.assertEquals(inputLineageInfo.getBaseEntityGuid(), tableId);
+
+ //Get lineage info on-demand with input and output limit as 3
+
ApplicationProperties.get().setProperty(AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getPropertyName(),
true);
+
+ LineageOnDemandConstraints lineageConstraints = new
LineageOnDemandConstraints(AtlasLineageInfo.LineageDirection.INPUT, 3, 3, 5);
+ Map<String, LineageOnDemandConstraints> lineageConstraintsByGuid = new
HashMap<>();
+ lineageConstraintsByGuid.put(tableId, lineageConstraints);
+
+ if (!isLineageOnDemandEnabled) {
+
Assert.fail(AtlasErrorCode.LINEAGE_ON_DEMAND_NOT_ENABLED.getFormattedErrorMessage(ATLAS_LINEAGE_ON_DEMAND_ENABLED));
+ }
+
+ AtlasLineageInfo inputLineageInfoOnDemand =
atlasClientV2.getLineageInfoOnDemand(tableId, lineageConstraintsByGuid);
+ Assert.assertNotNull(inputLineageInfoOnDemand);
+ entities = inputLineageInfoOnDemand.getGuidEntityMap();
+ Assert.assertNotNull(entities);
+
+ relations = inputLineageInfoOnDemand.getRelations();
+ Assert.assertNotNull(relations);
+
+ relationsOnDemand = inputLineageInfoOnDemand.getRelationsOnDemand();
+ Assert.assertNotNull(relationsOnDemand);
+
+ Assert.assertEquals(entities.size(), 7);
+ Assert.assertEquals(relations.size(), 6);
+ Assert.assertEquals(relationsOnDemand.size(), 1);
+
+ Assert.assertEquals(inputLineageInfoOnDemand.getLineageDirection(),
AtlasLineageInfo.LineageDirection.INPUT);
+ Assert.assertEquals(inputLineageInfoOnDemand.getLineageDepth(), 5);
+ Assert.assertEquals(inputLineageInfoOnDemand.getBaseEntityGuid(),
tableId);
+
+ AtlasLineageInfo.LineageInfoOnDemand relationsOnDemandByTableId =
relationsOnDemand.get(tableId);
+ Assert.assertNotNull(relationsOnDemandByTableId);
+
+ boolean hasMoreInputs = relationsOnDemandByTableId.hasMoreInputs();
+ Assert.assertTrue(hasMoreInputs);
+
+ }
+
@Test
public void testGetLineageInfoByAttribute() throws Exception {
Map<String, String> attributeMap = new HashMap<>();
@@ -129,5 +195,19 @@ public class LineageClientV2IT extends
DataSetLineageJerseyResourceIT {
loadProcess("loadSalesMonthly" + randomString(), "John ETL",
Collections.singletonList(salesFactDaily),
Collections.singletonList(salesFactMonthly), "create table as
select ", "plan", "id", "graph");
+
+ salesMonthlyTableOnDemand = "sales_fact_monthly_mv_on_demand_" +
randomString();
+ Id salesFactMonthlyOnDemand =
+ table(salesMonthlyTableOnDemand, "sales fact monthly
materialized view", salesDB, "Jane BI",
+ "MANAGED", salesFactColumns);
+
+ for (int i = 1; i <= 10; i++) {
+ Id salesFactDailyOnDemand =
+ table("sales_fact_daily_mv_on_demand_" + randomString() +
"_" + i, "sales fact daily materialized view -"+i, salesDB,
+ "Joe BI", "MANAGED", salesFactColumns);
+
+ loadProcess("loadSalesMonthly" + randomString() + "_" + i, "John
ETL", Collections.singletonList(salesFactDailyOnDemand),
+ Collections.singletonList(salesFactMonthlyOnDemand),
"create table as select ", "plan", "id", "graph");
+ }
}
}
diff --git a/webapp/src/test/resources/atlas-application.properties
b/webapp/src/test/resources/atlas-application.properties
index 2f433295d..b863ed84b 100644
--- a/webapp/src/test/resources/atlas-application.properties
+++ b/webapp/src/test/resources/atlas-application.properties
@@ -131,3 +131,6 @@ atlas.search.gremlin.enable=true
######### Configure use of Tasks #########
atlas.tasks.enabled=false
atlas.debug.metrics.enabled=true
+
+######### Configure on-demand lineage #########
+atlas.lineage.on.demand.enabled=true