This is an automated email from the ASF dual-hosted git repository. radhikakundam pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new 7ba5f191c ATLAS-4606: Improve Atlas Lineage API performance (fetch on-demand) 7ba5f191c is described below commit 7ba5f191c5701f6fa850d18895951df6d20ab812 Author: radhikakundam <radhikakun...@apache.org> AuthorDate: Mon Jul 25 16:42:38 2022 -0700 ATLAS-4606: Improve Atlas Lineage API performance (fetch on-demand) Signed-off-by: radhikakundam <radhikakun...@apache.org> --- .../main/java/org/apache/atlas/AtlasClientV2.java | 5 + .../java/org/apache/atlas/AtlasConfiguration.java | 3 + .../main/java/org/apache/atlas/AtlasErrorCode.java | 2 + .../atlas/model/lineage/AtlasLineageInfo.java | 175 +++++++++++- .../model/lineage/LineageOnDemandConstraints.java | 92 ++++++ .../test/resources/atlas-application.properties | 3 + .../atlas/discovery/AtlasLineageService.java | 10 + .../atlas/discovery/EntityLineageService.java | 314 +++++++++++++++++++-- .../apache/atlas/web/resources/AdminResource.java | 8 + .../org/apache/atlas/web/rest/LineageREST.java | 46 ++- .../atlas/web/integration/BaseResourceIT.java | 9 +- .../atlas/web/integration/LineageClientV2IT.java | 80 ++++++ .../test/resources/atlas-application.properties | 3 + 13 files changed, 716 insertions(+), 34 deletions(-) diff --git a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java index 85ca78bf9..6910b0e42 100644 --- a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java +++ b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java @@ -56,6 +56,7 @@ import org.apache.atlas.model.instance.ClassificationAssociateRequest; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.lineage.AtlasLineageInfo; import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection; +import org.apache.atlas.model.lineage.LineageOnDemandConstraints; import org.apache.atlas.model.profile.AtlasUserSavedSearch; import org.apache.atlas.model.typedef.AtlasBusinessMetadataDef; import org.apache.atlas.model.typedef.AtlasClassificationDef; @@ -627,6 +628,9 @@ public class AtlasClientV2 extends AtlasBaseClient { return callAPI(API_V2.GET_LINEAGE_BY_ATTRIBUTES, AtlasLineageInfo.class, queryParams, typeName); } + public AtlasLineageInfo getLineageInfoOnDemand(String guid, Map<String, LineageOnDemandConstraints> lineageConstraintsByGuid) throws AtlasServiceException { + return callAPI(API_V2.LINEAGE_INFO_ON_DEMAND, AtlasLineageInfo.class, lineageConstraintsByGuid, guid); + } /* Discovery APIs */ public AtlasSearchResult dslSearch(String query) throws AtlasServiceException { @@ -1206,6 +1210,7 @@ public class AtlasClientV2 extends AtlasBaseClient { // Lineage APIs public static final API_V2 LINEAGE_INFO = new API_V2(LINEAGE_URI, HttpMethod.GET, Response.Status.OK); public static final API_V2 GET_LINEAGE_BY_ATTRIBUTES = new API_V2(LINEAGE_URI + "uniqueAttribute/type/", HttpMethod.GET, Response.Status.OK); + public static final API_V2 LINEAGE_INFO_ON_DEMAND = new API_V2(LINEAGE_URI, HttpMethod.POST, Response.Status.OK); // Discovery APIs public static final API_V2 DSL_SEARCH = new API_V2(DSL_SEARCH_URI, HttpMethod.GET, Response.Status.OK); diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index 28fb68aef..e8c7a15ea 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -71,6 +71,9 @@ public enum AtlasConfiguration { IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", ""), MIGRATION_IMPORT_START_POSITION("atlas.migration.import.start.position", 0), LINEAGE_USING_GREMLIN("atlas.lineage.query.use.gremlin", false), + LINEAGE_ON_DEMAND_ENABLED("atlas.lineage.on.demand.enabled", false), + LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT("atlas.lineage.on.demand.default.node.count", 3), + LINEAGE_MAX_NODE_COUNT("atlas.lineage.max.node.count", 9000), HTTP_HEADER_SERVER_VALUE("atlas.http.header.server.value","Apache Atlas"), STORAGE_CONSISTENCY_LOCK_ENABLED("atlas.graph.storage.consistency-lock.enabled", true), diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java index ab35f8724..608342433 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java +++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java @@ -175,6 +175,7 @@ public enum AtlasErrorCode { ATTRIBUTE_NAME_ALREADY_EXISTS_IN_PARENT_TYPE(400, "ATLAS-400-00-09D", "Invalid attribute name: {0}.{1}. Attribute already exists in parent type: {2}"), ATTRIBUTE_NAME_ALREADY_EXISTS_IN_ANOTHER_PARENT_TYPE(400, "ATLAS-400-00-09E", "Invalid attribute name: {0}.{1}. Attribute already exists in another parent type: {2}"), IMPORT_INVALID_ZIP_ENTRY(400, "ATLAS-400-00-09F", "{0}: invalid zip entry. Reason: {1}"), + LINEAGE_ON_DEMAND_NOT_ENABLED(400, "ATLAS-400-00-100", "Lineage on demand config: {0} is not enabled"), UNAUTHORIZED_ACCESS(403, "ATLAS-403-00-001", "{0} is not authorized to perform {1}"), @@ -200,6 +201,7 @@ public enum AtlasErrorCode { NO_PROPAGATED_CLASSIFICATIONS_FOUND_FOR_ENTITY(404, "ATLAS-404-00-013", "No propagated classifications associated with entity: {0}"), FILE_NAME_NOT_FOUND(404, "ATLAS-404-00-014", "File name should not be blank"), NO_TYPE_NAME_ON_VERTEX(404, "ATLAS-404-00-015", "No typename found for given entity with guid: {0}"), + NO_LINEAGE_CONSTRAINTS_FOR_GUID(404, "ATLAS-404-00-016", "No lineage constraints found for requested entity with guid : {0}"), METHOD_NOT_ALLOWED(405, "ATLAS-405-00-001", "Error 405 - The request method {0} is inappropriate for the URL: {1}"), diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java index 27186ca7c..9ae6039f1 100644 --- a/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java +++ b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java @@ -20,6 +20,7 @@ package org.apache.atlas.model.lineage; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import org.apache.atlas.model.instance.AtlasEntityHeader; @@ -37,15 +38,19 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ @JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) -@JsonIgnoreProperties(ignoreUnknown = true) +@JsonIgnoreProperties(ignoreUnknown = true, value = {"visitedEdges"}) @XmlRootElement @XmlAccessorType(XmlAccessType.PROPERTY) public class AtlasLineageInfo implements Serializable { - private String baseEntityGuid; - private LineageDirection lineageDirection; - private int lineageDepth; - private Map<String, AtlasEntityHeader> guidEntityMap; - private Set<LineageRelation> relations; + + private String baseEntityGuid; + private LineageDirection lineageDirection; + private int lineageDepth; + private Map<String, AtlasEntityHeader> guidEntityMap; + private Set<LineageRelation> relations; + private Set<String> visitedEdges; + private Map<String, LineageInfoOnDemand> relationsOnDemand; + private Map<String, LineageOnDemandConstraints> lineageOnDemandPayload; public AtlasLineageInfo() {} @@ -62,11 +67,18 @@ public class AtlasLineageInfo implements Serializable { */ public AtlasLineageInfo(String baseEntityGuid, Map<String, AtlasEntityHeader> guidEntityMap, Set<LineageRelation> relations, LineageDirection lineageDirection, int lineageDepth) { - this.baseEntityGuid = baseEntityGuid; - this.lineageDirection = lineageDirection; - this.lineageDepth = lineageDepth; - this.guidEntityMap = guidEntityMap; - this.relations = relations; + this(baseEntityGuid, guidEntityMap, relations, null, null, lineageDirection, lineageDepth); + } + + public AtlasLineageInfo(String baseEntityGuid, Map<String, AtlasEntityHeader> guidEntityMap, + Set<LineageRelation> relations, Set<String> visitedEdges, Map<String, LineageInfoOnDemand> relationsOnDemand, LineageDirection lineageDirection, int lineageDepth) { + this.baseEntityGuid = baseEntityGuid; + this.lineageDirection = lineageDirection; + this.lineageDepth = lineageDepth; + this.guidEntityMap = guidEntityMap; + this.relations = relations; + this.visitedEdges = visitedEdges; + this.relationsOnDemand = relationsOnDemand; } public String getBaseEntityGuid() { @@ -93,6 +105,22 @@ public class AtlasLineageInfo implements Serializable { this.relations = relations; } + public Set<String> getVisitedEdges() { + return visitedEdges; + } + + public void setVisitedEdges(Set<String> visitedEdges) { + this.visitedEdges = visitedEdges; + } + + public Map<String, LineageInfoOnDemand> getRelationsOnDemand() { + return relationsOnDemand; + } + + public void setRelationsOnDemand(Map<String, LineageInfoOnDemand> relationsOnDemand) { + this.relationsOnDemand = relationsOnDemand; + } + public LineageDirection getLineageDirection() { return lineageDirection; } @@ -109,6 +137,14 @@ public class AtlasLineageInfo implements Serializable { this.lineageDepth = lineageDepth; } + public Map<String, LineageOnDemandConstraints> getLineageOnDemandPayload() { + return lineageOnDemandPayload; + } + + public void setLineageOnDemandPayload(Map<String, LineageOnDemandConstraints> lineageOnDemandPayload) { + this.lineageOnDemandPayload = lineageOnDemandPayload; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -132,6 +168,7 @@ public class AtlasLineageInfo implements Serializable { "baseEntityGuid=" + baseEntityGuid + ", guidEntityMap=" + guidEntityMap + ", relations=" + relations + + ", relationsOnDemand=" + relationsOnDemand + ", lineageDirection=" + lineageDirection + ", lineageDepth=" + lineageDepth + '}'; @@ -204,4 +241,120 @@ public class AtlasLineageInfo implements Serializable { } } + @JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) + @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) + @JsonIgnoreProperties(ignoreUnknown = true, value = {"inputRelationsReachedLimit", "outputRelationsReachedLimit"}) + @XmlRootElement + @XmlAccessorType(XmlAccessType.PROPERTY) + public static class LineageInfoOnDemand { + @JsonProperty + boolean hasMoreInputs; + @JsonProperty + boolean hasMoreOutputs; + int inputRelationsCount; + int outputRelationsCount; + boolean isInputRelationsReachedLimit; + boolean isOutputRelationsReachedLimit; + LineageOnDemandConstraints onDemandConstraints; + + public LineageInfoOnDemand() { } + + public LineageInfoOnDemand(LineageOnDemandConstraints onDemandConstraints) { + this.onDemandConstraints = onDemandConstraints; + this.hasMoreInputs = false; + this.hasMoreOutputs = false; + this.inputRelationsCount = 0; + this.outputRelationsCount = 0; + this.isInputRelationsReachedLimit = false; + this.isOutputRelationsReachedLimit = false; + } + + public boolean isInputRelationsReachedLimit() { + return isInputRelationsReachedLimit; + } + + public void setInputRelationsReachedLimit(boolean inputRelationsReachedLimit) { + isInputRelationsReachedLimit = inputRelationsReachedLimit; + } + + public boolean isOutputRelationsReachedLimit() { + return isOutputRelationsReachedLimit; + } + + public void setOutputRelationsReachedLimit(boolean outputRelationsReachedLimit) { + isOutputRelationsReachedLimit = outputRelationsReachedLimit; + } + + public boolean hasMoreInputs() { + return hasMoreInputs; + } + + public void setHasMoreInputs(boolean hasMoreInputs) { + this.hasMoreInputs = hasMoreInputs; + } + + public boolean hasMoreOutputs() { + return hasMoreOutputs; + } + + public void setHasMoreOutputs(boolean hasMoreOutputs) { + this.hasMoreOutputs = hasMoreOutputs; + } + + public int getInputRelationsCount() { + return inputRelationsCount; + } + + public void incrementInputRelationsCount() { + if (hasMoreInputs) { + return; + } + + if (isInputRelationsReachedLimit) { + setHasMoreInputs(true); + return; + } + + this.inputRelationsCount++; + + if (inputRelationsCount == onDemandConstraints.getInputRelationsLimit()) { + this.setInputRelationsReachedLimit(true); + return; + } + } + + public int getOutputRelationsCount() { + return outputRelationsCount; + } + + public void incrementOutputRelationsCount() { + if (hasMoreOutputs) { + return; + } + + if (isOutputRelationsReachedLimit) { + setHasMoreOutputs(true); + return; + } + + this.outputRelationsCount++; + + if (outputRelationsCount == onDemandConstraints.getOutputRelationsLimit()) { + this.setOutputRelationsReachedLimit(true); + return; + } + } + + @Override + public String toString() { + return "LineageInfoOnDemand{" + + "hasMoreInputs='" + hasMoreInputs + '\'' + + ", hasMoreOutputs='" + hasMoreOutputs + '\'' + + ", inputRelationsCount='" + inputRelationsCount + '\'' + + ", outputRelationsCount='" + outputRelationsCount + '\'' + + '}'; + } + + } + } \ No newline at end of file diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/LineageOnDemandConstraints.java b/intg/src/main/java/org/apache/atlas/model/lineage/LineageOnDemandConstraints.java new file mode 100644 index 000000000..0b3ea2418 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/model/lineage/LineageOnDemandConstraints.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.model.lineage; + + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.atlas.AtlasConfiguration; +import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection; + +import java.io.Serializable; + +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; + +@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +/** + * This is the root class representing the input for lineage search on-demand. + */ +public class LineageOnDemandConstraints implements Serializable { + private static final long serialVersionUID = 1L; + + private LineageDirection direction; + private int inputRelationsLimit; + private int outputRelationsLimit; + private int depth; + + private static final int LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT = AtlasConfiguration.LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT.getInt(); + private static final int LINEAGE_ON_DEMAND_DEFAULT_DEPTH = 3; + + public LineageOnDemandConstraints() { + this(LineageDirection.BOTH, LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT, LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT, LINEAGE_ON_DEMAND_DEFAULT_DEPTH); + } + + public LineageOnDemandConstraints(LineageDirection direction, int inputRelationsLimit, int outputRelationsLimit, int depth) { + this.direction = direction; + this.inputRelationsLimit = inputRelationsLimit; + this.outputRelationsLimit = outputRelationsLimit; + this.depth = depth; + } + + public LineageDirection getDirection() { + return direction; + } + + public void setDirection(LineageDirection direction) { + this.direction = direction; + } + + public int getInputRelationsLimit() { + return inputRelationsLimit; + } + + public void setInputRelationsLimit(int inputRelationsLimit) { + this.inputRelationsLimit = inputRelationsLimit; + } + + public int getOutputRelationsLimit() { + return outputRelationsLimit; + } + + public void setOutputRelationsLimit(int outputRelationsLimit) { + this.outputRelationsLimit = outputRelationsLimit; + } + + public int getDepth() { + return depth; + } + + public void setDepth(int depth) { + this.depth = depth; + } + +} diff --git a/intg/src/test/resources/atlas-application.properties b/intg/src/test/resources/atlas-application.properties index 2139cec24..1156abcca 100644 --- a/intg/src/test/resources/atlas-application.properties +++ b/intg/src/test/resources/atlas-application.properties @@ -146,3 +146,6 @@ atlas.search.gremlin.enable=true ######### Configure use of Tasks ######### atlas.tasks.enabled=false + +######### Configure on-demand lineage ######### +atlas.lineage.on.demand.enabled=true diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java index 8dc6d3a19..0b518b904 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageService.java @@ -22,8 +22,11 @@ package org.apache.atlas.discovery; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.lineage.AtlasLineageInfo; import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection; +import org.apache.atlas.model.lineage.LineageOnDemandConstraints; import org.apache.atlas.v1.model.lineage.SchemaResponse.SchemaDetails; +import java.util.Map; + public interface AtlasLineageService { /** * @param entityGuid unique ID of the entity @@ -33,6 +36,13 @@ public interface AtlasLineageService { */ AtlasLineageInfo getAtlasLineageInfo(String entityGuid, LineageDirection direction, int depth) throws AtlasBaseException; + /** + * @param entityGuid unique ID of the entity + * @param lineageOnDemandConstraintsByGuid map of constraints to fetch lineage for each guid + * @return AtlasLineageInfo + */ + AtlasLineageInfo getAtlasLineageInfo(String entityGuid, Map<String, LineageOnDemandConstraints> lineageOnDemandConstraintsByGuid) throws AtlasBaseException; + /** * Return the schema for the given datasetName. * diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index d18ace841..aa881f2bd 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -31,8 +31,10 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.lineage.AtlasLineageInfo; +import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageInfoOnDemand; import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection; import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation; +import org.apache.atlas.model.lineage.LineageOnDemandConstraints; import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; import org.apache.atlas.repository.graphdb.AtlasGraph; @@ -59,6 +61,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -80,10 +83,14 @@ import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery. public class EntityLineageService implements AtlasLineageService { private static final Logger LOG = LoggerFactory.getLogger(EntityLineageService.class); - private static final String PROCESS_INPUTS_EDGE = "__Process.inputs"; - private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs"; - private static final String COLUMNS = "columns"; - private static final boolean LINEAGE_USING_GREMLIN = AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean(); + private static final String PROCESS_INPUTS_EDGE = "__Process.inputs"; + private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs"; + private static final String COLUMNS = "columns"; + private static final boolean LINEAGE_USING_GREMLIN = AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean(); + private static final Integer DEFAULT_LINEAGE_MAX_NODE_COUNT = 9000; + private static final int LINEAGE_ON_DEMAND_DEFAULT_DEPTH = 3; + private static final int LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT = AtlasConfiguration.LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT.getInt(); + private static final String SEPARATOR = "->"; private final AtlasGraph graph; private final AtlasGremlinQueryProvider gremlinQueryProvider; @@ -103,6 +110,40 @@ public class EntityLineageService implements AtlasLineageService { public AtlasLineageInfo getAtlasLineageInfo(String guid, LineageDirection direction, int depth) throws AtlasBaseException { AtlasLineageInfo ret; + boolean isDataSet = validateEntityTypeAndCheckIfDataSet(guid); + + if (LINEAGE_USING_GREMLIN) { + ret = getLineageInfoV1(guid, direction, depth, isDataSet); + } else { + ret = getLineageInfoV2(guid, direction, depth, isDataSet); + } + + return ret; + } + + @Override + @GraphTransaction + public AtlasLineageInfo getAtlasLineageInfo(String guid, Map<String, LineageOnDemandConstraints> lineageConstraintsMap) throws AtlasBaseException { + AtlasLineageInfo ret; + + if (MapUtils.isEmpty(lineageConstraintsMap)) { + lineageConstraintsMap = new HashMap<>(); + lineageConstraintsMap.put(guid, getDefaultLineageConstraints(guid)); + } + + boolean isDataSet = validateEntityTypeAndCheckIfDataSet(guid); + + ret = getLineageInfoOnDemand(guid, lineageConstraintsMap, isDataSet); + + appendLineageOnDemandPayload(ret, lineageConstraintsMap); + + // filtering out on-demand relations which has input & output nodes within the limit + cleanupRelationsOnDemand(ret); + + return ret; + } + + private boolean validateEntityTypeAndCheckIfDataSet(String guid) throws AtlasBaseException { AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(atlasTypeRegistry, AtlasPrivilege.ENTITY_READ, entity), "read entity lineage: guid=", guid); @@ -123,13 +164,21 @@ public class EntityLineageService implements AtlasLineageService { } } - if (LINEAGE_USING_GREMLIN) { - ret = getLineageInfoV1(guid, direction, depth, isDataSet); - } else { - ret = getLineageInfoV2(guid, direction, depth, isDataSet); + return isDataSet; + } + + private void appendLineageOnDemandPayload(AtlasLineageInfo lineageInfo, Map<String, LineageOnDemandConstraints> lineageConstraintsMap) { + if (lineageInfo == null || MapUtils.isEmpty(lineageConstraintsMap)) { + return; } + lineageInfo.setLineageOnDemandPayload(lineageConstraintsMap); + } - return ret; + //Consider only relationsOnDemand which has either more inputs or more outputs than given limit + private void cleanupRelationsOnDemand(AtlasLineageInfo lineageInfo) { + if (lineageInfo != null && MapUtils.isNotEmpty(lineageInfo.getRelationsOnDemand())) { + lineageInfo.getRelationsOnDemand().entrySet().removeIf(x -> !(x.getValue().hasMoreInputs() || x.getValue().hasMoreOutputs())); + } } @Override @@ -300,6 +349,102 @@ public class EntityLineageService implements AtlasLineageService { return ret; } + private LineageOnDemandConstraints getDefaultLineageConstraints(String guid) { + if (LOG.isDebugEnabled()) { + LOG.debug("No lineage on-demand constraints provided for guid: {}, configuring with default values direction: {}, inputRelationsLimit: {}, outputRelationsLimit: {}, depth: {}", + guid, BOTH, LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT, LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT, LINEAGE_ON_DEMAND_DEFAULT_DEPTH); + } + + return new LineageOnDemandConstraints(); + } + + private LineageOnDemandConstraints getAndValidateLineageConstraintsByGuid(String guid, Map<String, LineageOnDemandConstraints> lineageConstraintsMap) { + + if (lineageConstraintsMap == null || !lineageConstraintsMap.containsKey(guid)) { + return getDefaultLineageConstraints(guid); + } + + LineageOnDemandConstraints lineageConstraintsByGuid = lineageConstraintsMap.get(guid);; + if (lineageConstraintsByGuid == null) { + return getDefaultLineageConstraints(guid); + } + + if (Objects.isNull(lineageConstraintsByGuid.getDirection())) { + LOG.info("No lineage on-demand direction provided for guid: {}, configuring with default value {}", guid, LineageDirection.BOTH); + lineageConstraintsByGuid.setDirection(BOTH); + } + + if (lineageConstraintsByGuid.getInputRelationsLimit() == 0) { + LOG.info("No lineage on-demand inputRelationsLimit provided for guid: {}, configuring with default value {}", guid, LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT); + lineageConstraintsByGuid.setInputRelationsLimit(LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT); + } + + if (lineageConstraintsByGuid.getOutputRelationsLimit() == 0) { + LOG.info("No lineage on-demand outputRelationsLimit provided for guid: {}, configuring with default value {}", guid, LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT); + lineageConstraintsByGuid.setOutputRelationsLimit(LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT); + } + + if (lineageConstraintsByGuid.getDepth() == 0) { + LOG.info("No lineage on-demand depth provided for guid: {}, configuring with default value {}", guid, LINEAGE_ON_DEMAND_DEFAULT_DEPTH); + lineageConstraintsByGuid.setDepth(LINEAGE_ON_DEMAND_DEFAULT_DEPTH); + } + + return lineageConstraintsByGuid; + + } + + private AtlasLineageInfo getLineageInfoOnDemand(String guid, Map<String, LineageOnDemandConstraints> lineageConstraintsMap, boolean isDataSet) throws AtlasBaseException { + + LineageOnDemandConstraints lineageConstraintsByGuid = getAndValidateLineageConstraintsByGuid(guid, lineageConstraintsMap); + + if (lineageConstraintsByGuid == null) { + throw new AtlasBaseException(AtlasErrorCode.NO_LINEAGE_CONSTRAINTS_FOR_GUID, guid); + } + + LineageDirection direction = lineageConstraintsByGuid.getDirection(); + int depth = lineageConstraintsByGuid.getDepth(); + + AtlasLineageInfo ret = initializeLineageInfo(guid, direction, depth); + + if (depth == 0) { + depth = -1; + } + + if (isDataSet) { + AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); + + if (!ret.getRelationsOnDemand().containsKey(guid)) { + ret.getRelationsOnDemand().put(guid, new LineageInfoOnDemand(lineageConstraintsByGuid)); + } + + if (direction == INPUT || direction == BOTH) { + traverseEdgesOnDemand(datasetVertex, true, depth, new HashSet<>(), lineageConstraintsMap, ret); + } + + if (direction == OUTPUT || direction == BOTH) { + traverseEdgesOnDemand(datasetVertex, false, depth, new HashSet<>(), lineageConstraintsMap, ret); + } + } else { + AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); + + // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' + if (direction == INPUT || direction == BOTH) { + Iterable<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE); + + traverseEdgesOnDemand(processEdges, true, depth, lineageConstraintsMap, ret); + } + + if (direction == OUTPUT || direction == BOTH) { + Iterable<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE); + + traverseEdgesOnDemand(processEdges, false, depth, lineageConstraintsMap, ret); + } + + } + + return ret; + } + private void traverseEdges(AtlasVertex datasetVertex, boolean isInput, int depth, AtlasLineageInfo ret) throws AtlasBaseException { traverseEdges(datasetVertex, isInput, depth, new HashSet<>(), ret); } @@ -331,24 +476,139 @@ public class EntityLineageService implements AtlasLineageService { } } + private void traverseEdgesOnDemand(Iterable<AtlasEdge> processEdges, boolean isInput, int depth, Map<String, LineageOnDemandConstraints> lineageConstraintsMap, AtlasLineageInfo ret) throws AtlasBaseException { + for (AtlasEdge processEdge : processEdges) { + boolean isInputEdge = processEdge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); + + if (incrementAndCheckIfRelationsLimitReached(processEdge, isInputEdge, lineageConstraintsMap, ret)) { + break; + } else { + addEdgeToResult(processEdge, ret); + } + + AtlasVertex datasetVertex = processEdge.getInVertex(); + + String inGuid = AtlasGraphUtilsV2.getIdFromVertex(datasetVertex); + LineageOnDemandConstraints inGuidLineageConstrains = getAndValidateLineageConstraintsByGuid(inGuid, lineageConstraintsMap); + + if (!ret.getRelationsOnDemand().containsKey(inGuid)) { + ret.getRelationsOnDemand().put(inGuid, new LineageInfoOnDemand(inGuidLineageConstrains)); + } + + traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, new HashSet<>(), lineageConstraintsMap, ret); + } + } + + private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, Set<String> visitedVertices, Map<String, LineageOnDemandConstraints> lineageConstraintsMap, AtlasLineageInfo ret) throws AtlasBaseException { + if (depth != 0) { + // keep track of visited vertices to avoid circular loop + visitedVertices.add(getId(datasetVertex)); + + Iterable<AtlasEdge> incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE); + + for (AtlasEdge incomingEdge : incomingEdges) { + + if (incrementAndCheckIfRelationsLimitReached(incomingEdge, !isInput, lineageConstraintsMap, ret)) { + break; + } else { + addEdgeToResult(incomingEdge, ret); + } + + AtlasVertex processVertex = incomingEdge.getOutVertex(); + Iterable<AtlasEdge> outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE); + + for (AtlasEdge outgoingEdge : outgoingEdges) { + + if (incrementAndCheckIfRelationsLimitReached(outgoingEdge, isInput, lineageConstraintsMap, ret)) { + break; + } else { + addEdgeToResult(outgoingEdge, ret); + } + + AtlasVertex entityVertex = outgoingEdge.getInVertex(); + + if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) { + traverseEdgesOnDemand(entityVertex, isInput, depth - 1, visitedVertices, lineageConstraintsMap, ret); + } + } + } + } + } + + private boolean incrementAndCheckIfRelationsLimitReached(AtlasEdge atlasEdge, boolean isInput, Map<String, LineageOnDemandConstraints> lineageConstraintsMap, AtlasLineageInfo ret) { + + if (lineageContainsEdge(ret, atlasEdge)) { + return false; + } + + boolean hasRelationsLimitReached = false; + + AtlasVertex inVertex = isInput ? atlasEdge.getOutVertex() : atlasEdge.getInVertex(); + String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex); + LineageOnDemandConstraints inGuidLineageConstraints = getAndValidateLineageConstraintsByGuid(inGuid, lineageConstraintsMap); + + AtlasVertex outVertex = isInput ? atlasEdge.getInVertex() : atlasEdge.getOutVertex(); + String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex); + LineageOnDemandConstraints outGuidLineageConstraints = getAndValidateLineageConstraintsByGuid(outGuid, lineageConstraintsMap); + + LineageInfoOnDemand inLineageInfo = ret.getRelationsOnDemand().containsKey(inGuid) ? ret.getRelationsOnDemand().get(inGuid) : new LineageInfoOnDemand(inGuidLineageConstraints); + LineageInfoOnDemand outLineageInfo = ret.getRelationsOnDemand().containsKey(outGuid) ? ret.getRelationsOnDemand().get(outGuid) : new LineageInfoOnDemand(outGuidLineageConstraints); + + if (inLineageInfo.isInputRelationsReachedLimit()) { + inLineageInfo.setHasMoreInputs(true); + hasRelationsLimitReached = true; + }else { + inLineageInfo.incrementInputRelationsCount(); + } + + if (outLineageInfo.isOutputRelationsReachedLimit()) { + outLineageInfo.setHasMoreOutputs(true); + hasRelationsLimitReached = true; + } else { + outLineageInfo.incrementOutputRelationsCount(); + } + + if (!hasRelationsLimitReached) { + ret.getRelationsOnDemand().put(inGuid, inLineageInfo); + ret.getRelationsOnDemand().put(outGuid, outLineageInfo); + } + + return hasRelationsLimitReached; + } + private void addEdgeToResult(AtlasEdge edge, AtlasLineageInfo lineageInfo) throws AtlasBaseException { - if (!lineageContainsEdge(lineageInfo, edge)) { + if (!lineageContainsEdge(lineageInfo, edge) && !lineageMaxNodeCountReached(lineageInfo.getRelations())) { processEdge(edge, lineageInfo); } } + private int getLineageMaxNodeAllowedCount() { + return Math.min(DEFAULT_LINEAGE_MAX_NODE_COUNT, AtlasConfiguration.LINEAGE_MAX_NODE_COUNT.getInt()); + } + + private boolean lineageMaxNodeCountReached(Set<LineageRelation> relations) { + return CollectionUtils.isNotEmpty(relations) && relations.size() > getLineageMaxNodeAllowedCount(); + } + + private String getVisitedEdgeLabel(String inGuid, String outGuid, String relationGuid) { + if (isLineageOnDemandEnabled()) { + return inGuid + SEPARATOR + outGuid; + } + return relationGuid; + } + private boolean lineageContainsEdge(AtlasLineageInfo lineageInfo, AtlasEdge edge) { boolean ret = false; - if (lineageInfo != null && CollectionUtils.isNotEmpty(lineageInfo.getRelations()) && edge != null) { - String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - Set<LineageRelation> relations = lineageInfo.getRelations(); + if (edge != null && lineageInfo != null && CollectionUtils.isNotEmpty(lineageInfo.getVisitedEdges())) { + String inGuid = AtlasGraphUtilsV2.getIdFromVertex(edge.getInVertex()); + String outGuid = AtlasGraphUtilsV2.getIdFromVertex(edge.getOutVertex()); + String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); + boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); + String visitedEdgeLabel = isInputEdge ? getVisitedEdgeLabel(inGuid, outGuid, relationGuid) : getVisitedEdgeLabel(outGuid, inGuid, relationGuid); - for (LineageRelation relation : relations) { - if (relation.getRelationshipId().equals(relationGuid)) { - ret = true; - break; - } + if (lineageInfo.getVisitedEdges().contains(visitedEdgeLabel)) { + ret = true; } } @@ -356,11 +616,11 @@ public class EntityLineageService implements AtlasLineageService { } private void processEdge(final AtlasEdge edge, final AtlasLineageInfo lineageInfo) throws AtlasBaseException { - processEdge(edge, lineageInfo.getGuidEntityMap(), lineageInfo.getRelations()); + processEdge(edge, lineageInfo.getGuidEntityMap(), lineageInfo.getRelations(), lineageInfo.getVisitedEdges()); } private AtlasLineageInfo initializeLineageInfo(String guid, LineageDirection direction, int depth) { - return new AtlasLineageInfo(guid, new HashMap<>(), new HashSet<>(), direction, depth); + return new AtlasLineageInfo(guid, new HashMap<>(), new HashSet<>(), new HashSet<>(), new HashMap<>(), direction, depth); } private static String getId(AtlasVertex vertex) { @@ -383,6 +643,10 @@ public class EntityLineageService implements AtlasLineageService { } private void processEdge(final AtlasEdge edge, final Map<String, AtlasEntityHeader> entities, final Set<LineageRelation> relations) throws AtlasBaseException { + processEdge(edge, entities, relations, null); + } + + private void processEdge(final AtlasEdge edge, final Map<String, AtlasEntityHeader> entities, final Set<LineageRelation> relations, final Set<String> visitedEdges) throws AtlasBaseException { AtlasVertex inVertex = edge.getInVertex(); AtlasVertex outVertex = edge.getOutVertex(); String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex); @@ -390,6 +654,7 @@ public class EntityLineageService implements AtlasLineageService { String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); + if (!entities.containsKey(inGuid)) { AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeader(inVertex); entities.put(inGuid, entityHeader); @@ -405,6 +670,11 @@ public class EntityLineageService implements AtlasLineageService { } else { relations.add(new LineageRelation(outGuid, inGuid, relationGuid)); } + + if (visitedEdges != null) { + String visitedEdgeLabel = isInputEdge ? getVisitedEdgeLabel(inGuid, outGuid, relationGuid) : getVisitedEdgeLabel(outGuid, inGuid, relationGuid); + visitedEdges.add(visitedEdgeLabel); + } } private AtlasLineageInfo getBothLineageInfoV1(String guid, int depth, boolean isDataSet) throws AtlasBaseException { @@ -448,4 +718,8 @@ public class EntityLineageService implements AtlasLineageService { return ret; } + + public boolean isLineageOnDemandEnabled() { + return AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getBoolean(); + } } \ No newline at end of file diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java index d55ada77e..b19095b48 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java @@ -184,6 +184,8 @@ public class AdminResource { private final AtlasDebugMetricsSink debugMetricsRESTSink; private final boolean isDebugMetricsEnabled; private final boolean isTasksEnabled; + private final boolean isOnDemandLineageEnabled; + private final int defaultLineageNodeCount; static { try { @@ -224,12 +226,16 @@ public class AdminResource { this.uiDateFormat = atlasProperties.getString(UI_DATE_FORMAT, UI_DATE_DEFAULT_FORMAT); this.isDebugMetricsEnabled = AtlasConfiguration.DEBUG_METRICS_ENABLED.getBoolean(); this.isTasksEnabled = AtlasConfiguration.TASKS_USE_ENABLED.getBoolean(); + this.isOnDemandLineageEnabled = AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getBoolean(); + this.defaultLineageNodeCount = AtlasConfiguration.LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT.getInt(); } else { this.defaultUIVersion = UI_VERSION_V2; this.isTimezoneFormatEnabled = true; this.uiDateFormat = UI_DATE_DEFAULT_FORMAT; this.isDebugMetricsEnabled = false; this.isTasksEnabled = false; + this.isOnDemandLineageEnabled = false; + this.defaultLineageNodeCount = 3; } } @@ -379,6 +385,8 @@ public class AdminResource { responseData.put(UI_DATE_FORMAT, uiDateFormat); responseData.put(AtlasConfiguration.DEBUG_METRICS_ENABLED.getPropertyName(), isDebugMetricsEnabled); responseData.put(AtlasConfiguration.TASKS_USE_ENABLED.getPropertyName(), isTasksEnabled); + responseData.put(AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getPropertyName(), isOnDemandLineageEnabled); + responseData.put(AtlasConfiguration.LINEAGE_ON_DEMAND_DEFAULT_NODE_COUNT.getPropertyName(), defaultLineageNodeCount); if (AtlasConfiguration.SESSION_TIMEOUT_SECS.getInt() != -1) { responseData.put(AtlasConfiguration.SESSION_TIMEOUT_SECS.getPropertyName(), AtlasConfiguration.SESSION_TIMEOUT_SECS.getInt()); diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java index 30b562607..c66200c9b 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java @@ -19,6 +19,7 @@ package org.apache.atlas.web.rest; +import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.annotation.Timed; import org.apache.atlas.discovery.AtlasLineageService; @@ -26,6 +27,7 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.lineage.AtlasLineageInfo; import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection; +import org.apache.atlas.model.lineage.LineageOnDemandConstraints; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; @@ -33,6 +35,7 @@ import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.atlas.web.util.Servlets; import org.apache.commons.collections.MapUtils; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import javax.inject.Inject; @@ -43,6 +46,7 @@ import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.PathParam; +import javax.ws.rs.POST; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; @@ -59,13 +63,15 @@ import java.util.Map; @Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON}) @Produces({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON}) public class LineageREST { + private static final Logger LOG = LoggerFactory.getLogger(LineageREST.class); private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("rest.LineageREST"); private static final String PREFIX_ATTR = "attr:"; private final AtlasTypeRegistry typeRegistry; private final AtlasLineageService atlasLineageService; - private static final String DEFAULT_DIRECTION = "BOTH"; - private static final String DEFAULT_DEPTH = "3"; + + private static final String DEFAULT_DIRECTION = "BOTH"; + private static final String DEFAULT_DEPTH = "3"; @Context private HttpServletRequest httpServletRequest; @@ -109,6 +115,42 @@ public class LineageREST { } } + /** + * Returns lineage info about entity. + * @return AtlasLineageInfo + * @throws AtlasBaseException + * @HTTP 200 If Lineage exists for the given entity + * @HTTP 400 Bad query parameters + * @HTTP 404 If no lineage is found for the given entity + */ + @POST + @Path("/{guid}") + @Consumes(Servlets.JSON_MEDIA_TYPE) + @Produces(Servlets.JSON_MEDIA_TYPE) + @Timed + public AtlasLineageInfo getLineageGraph(@PathParam("guid") String guid, + Map<String, LineageOnDemandConstraints> lineageConstraintsMapByGuid) throws AtlasBaseException { + if (!AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getBoolean()) { + LOG.warn("LineageREST: "+ AtlasErrorCode.LINEAGE_ON_DEMAND_NOT_ENABLED.getFormattedErrorMessage(AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getPropertyName())); + + throw new AtlasBaseException(AtlasErrorCode.LINEAGE_ON_DEMAND_NOT_ENABLED, AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getPropertyName()); + } + + Servlets.validateQueryParamLength("guid", guid); + + AtlasPerfTracer perf = null; + + try { + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "LineageREST.getLineageGraph(" + guid + "," + lineageConstraintsMapByGuid + ")"); + } + + return atlasLineageService.getAtlasLineageInfo(guid, lineageConstraintsMapByGuid); + } finally { + AtlasPerfTracer.log(perf); + } + } + /** * Returns lineage info about entity. * diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java index 9d5a9e5c3..c2ccec795 100755 --- a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java +++ b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java @@ -104,6 +104,7 @@ public abstract class BaseResourceIT { public static final String SEC_TAG = "sec_Tag"; public static final String FINANCE_TAG = "finance_Tag"; public static final String CLASSIFICATION = "classification"; + public static final String ATLAS_LINEAGE_ON_DEMAND_ENABLED = "atlas.lineage.on.demand.enabled"; protected static final int MAX_WAIT_TIME = 60000; @@ -111,6 +112,7 @@ public abstract class BaseResourceIT { protected AtlasClient atlasClientV1; protected AtlasClientV2 atlasClientV2; protected String[] atlasUrls; + protected boolean isLineageOnDemandEnabled; protected NotificationInterface notificationInterface = null; @@ -123,11 +125,16 @@ public abstract class BaseResourceIT { ApplicationProperties.get().setProperty("atlas.client.readTimeoutMSecs", "100000000"); ApplicationProperties.get().setProperty("atlas.client.connectTimeoutMSecs", "100000000"); - Configuration configuration = ApplicationProperties.get(); + isLineageOnDemandEnabled = configuration.getBoolean(ATLAS_LINEAGE_ON_DEMAND_ENABLED); + if (!isLineageOnDemandEnabled) { + ApplicationProperties.get().setProperty(ATLAS_LINEAGE_ON_DEMAND_ENABLED, true); + } + atlasUrls = configuration.getStringArray(ATLAS_REST_ADDRESS); + if (atlasUrls == null || atlasUrls.length == 0) { atlasUrls = new String[] { "http://localhost:21000/" }; } diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/LineageClientV2IT.java b/webapp/src/test/java/org/apache/atlas/web/integration/LineageClientV2IT.java index e7b67f21e..e2c86bcc4 100644 --- a/webapp/src/test/java/org/apache/atlas/web/integration/LineageClientV2IT.java +++ b/webapp/src/test/java/org/apache/atlas/web/integration/LineageClientV2IT.java @@ -18,8 +18,12 @@ package org.apache.atlas.web.integration; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasConfiguration; +import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.lineage.AtlasLineageInfo; +import org.apache.atlas.model.lineage.LineageOnDemandConstraints; import org.apache.atlas.v1.model.instance.Id; import org.apache.atlas.v1.model.instance.Referenceable; import org.slf4j.Logger; @@ -44,6 +48,7 @@ public class LineageClientV2IT extends DataSetLineageJerseyResourceIT { private static final Logger LOG = LoggerFactory.getLogger(LineageClientV2IT.class); private String salesFactTable; private String salesMonthlyTable; + private String salesMonthlyTableOnDemand; private String salesDBName; @BeforeClass @@ -74,6 +79,67 @@ public class LineageClientV2IT extends DataSetLineageJerseyResourceIT { Assert.assertEquals(inputLineageInfo.getBaseEntityGuid(), tableId); } + @Test + public void testGetLineageInfoOnDemand() throws Exception { + String tableId = atlasClientV1.getEntity(HIVE_TABLE_TYPE, + REFERENCEABLE_ATTRIBUTE_NAME, salesMonthlyTableOnDemand).getId()._getId(); + + //Get entire Lineage Info + AtlasLineageInfo inputLineageInfo = atlasClientV2.getLineageInfo(tableId, AtlasLineageInfo.LineageDirection.INPUT, 5); + Assert.assertNotNull(inputLineageInfo); + Map<String, AtlasEntityHeader> entities = inputLineageInfo.getGuidEntityMap(); + Assert.assertNotNull(entities); + + Set<AtlasLineageInfo.LineageRelation> relations = inputLineageInfo.getRelations(); + Assert.assertNotNull(relations); + + Map<String, AtlasLineageInfo.LineageInfoOnDemand> relationsOnDemand = inputLineageInfo.getRelationsOnDemand(); + Assert.assertTrue(relationsOnDemand == null || relationsOnDemand.size() == 0); + + Assert.assertEquals(entities.size(), 21); + Assert.assertEquals(relations.size(), 20); + Assert.assertEquals(inputLineageInfo.getLineageDirection(), AtlasLineageInfo.LineageDirection.INPUT); + Assert.assertEquals(inputLineageInfo.getLineageDepth(), 5); + Assert.assertEquals(inputLineageInfo.getBaseEntityGuid(), tableId); + + //Get lineage info on-demand with input and output limit as 3 + ApplicationProperties.get().setProperty(AtlasConfiguration.LINEAGE_ON_DEMAND_ENABLED.getPropertyName(), true); + + LineageOnDemandConstraints lineageConstraints = new LineageOnDemandConstraints(AtlasLineageInfo.LineageDirection.INPUT, 3, 3, 5); + Map<String, LineageOnDemandConstraints> lineageConstraintsByGuid = new HashMap<>(); + lineageConstraintsByGuid.put(tableId, lineageConstraints); + + if (!isLineageOnDemandEnabled) { + Assert.fail(AtlasErrorCode.LINEAGE_ON_DEMAND_NOT_ENABLED.getFormattedErrorMessage(ATLAS_LINEAGE_ON_DEMAND_ENABLED)); + } + + AtlasLineageInfo inputLineageInfoOnDemand = atlasClientV2.getLineageInfoOnDemand(tableId, lineageConstraintsByGuid); + Assert.assertNotNull(inputLineageInfoOnDemand); + entities = inputLineageInfoOnDemand.getGuidEntityMap(); + Assert.assertNotNull(entities); + + relations = inputLineageInfoOnDemand.getRelations(); + Assert.assertNotNull(relations); + + relationsOnDemand = inputLineageInfoOnDemand.getRelationsOnDemand(); + Assert.assertNotNull(relationsOnDemand); + + Assert.assertEquals(entities.size(), 7); + Assert.assertEquals(relations.size(), 6); + Assert.assertEquals(relationsOnDemand.size(), 1); + + Assert.assertEquals(inputLineageInfoOnDemand.getLineageDirection(), AtlasLineageInfo.LineageDirection.INPUT); + Assert.assertEquals(inputLineageInfoOnDemand.getLineageDepth(), 5); + Assert.assertEquals(inputLineageInfoOnDemand.getBaseEntityGuid(), tableId); + + AtlasLineageInfo.LineageInfoOnDemand relationsOnDemandByTableId = relationsOnDemand.get(tableId); + Assert.assertNotNull(relationsOnDemandByTableId); + + boolean hasMoreInputs = relationsOnDemandByTableId.hasMoreInputs(); + Assert.assertTrue(hasMoreInputs); + + } + @Test public void testGetLineageInfoByAttribute() throws Exception { Map<String, String> attributeMap = new HashMap<>(); @@ -129,5 +195,19 @@ public class LineageClientV2IT extends DataSetLineageJerseyResourceIT { loadProcess("loadSalesMonthly" + randomString(), "John ETL", Collections.singletonList(salesFactDaily), Collections.singletonList(salesFactMonthly), "create table as select ", "plan", "id", "graph"); + + salesMonthlyTableOnDemand = "sales_fact_monthly_mv_on_demand_" + randomString(); + Id salesFactMonthlyOnDemand = + table(salesMonthlyTableOnDemand, "sales fact monthly materialized view", salesDB, "Jane BI", + "MANAGED", salesFactColumns); + + for (int i = 1; i <= 10; i++) { + Id salesFactDailyOnDemand = + table("sales_fact_daily_mv_on_demand_" + randomString() + "_" + i, "sales fact daily materialized view -"+i, salesDB, + "Joe BI", "MANAGED", salesFactColumns); + + loadProcess("loadSalesMonthly" + randomString() + "_" + i, "John ETL", Collections.singletonList(salesFactDailyOnDemand), + Collections.singletonList(salesFactMonthlyOnDemand), "create table as select ", "plan", "id", "graph"); + } } } diff --git a/webapp/src/test/resources/atlas-application.properties b/webapp/src/test/resources/atlas-application.properties index 2f433295d..b863ed84b 100644 --- a/webapp/src/test/resources/atlas-application.properties +++ b/webapp/src/test/resources/atlas-application.properties @@ -131,3 +131,6 @@ atlas.search.gremlin.enable=true ######### Configure use of Tasks ######### atlas.tasks.enabled=false atlas.debug.metrics.enabled=true + +######### Configure on-demand lineage ######### +atlas.lineage.on.demand.enabled=true