Repository: incubator-atlas Updated Branches: refs/heads/master 2119666fd -> ea6c3cb5a
ATLAS-1234: Lineage REST API - v2 Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/ea6c3cb5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/ea6c3cb5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/ea6c3cb5 Branch: refs/heads/master Commit: ea6c3cb5a0c69ae76fe6a6d337080fe87c48aa5b Parents: 2119666 Author: Sarath Subramanian <[email protected]> Authored: Thu Oct 20 10:59:04 2016 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Fri Nov 11 16:43:23 2016 -0800 ---------------------------------------------------------------------- .../org/apache/atlas/repository/Constants.java | 3 + .../java/org/apache/atlas/AtlasErrorCode.java | 3 + .../atlas/model/lineage/AtlasLineageInfo.java | 206 +++++++++++ .../model/lineage/AtlasLineageService.java | 34 ++ release-log.txt | 1 + .../apache/atlas/RepositoryMetadataModule.java | 3 + .../atlas/discovery/EntityLineageService.java | 209 +++++++++++ .../atlas/lineage/EntityLineageServiceTest.java | 347 +++++++++++++++++++ .../org/apache/atlas/web/rest/LineageREST.java | 75 ++++ .../EntityLineageJerseyResourceIT.java | 253 ++++++++++++++ 10 files changed, 1134 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea6c3cb5/common/src/main/java/org/apache/atlas/repository/Constants.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index 4a68317..cc184a5 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -86,6 +86,9 @@ public final class Constants { public static final String FULLTEXT_INDEX = "fulltext_index"; + public static final String QUALIFIED_NAME = "Referenceable.qualifiedName"; + public static final String TYPE_NAME_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "typeName"; + private Constants() { } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea6c3cb5/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java index fe38fba..8e0d164 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java +++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java @@ -46,6 +46,9 @@ public enum AtlasErrorCode { TYPE_NAME_NOT_FOUND(404, "ATLAS4041E", "Given typename {0} was invalid"), TYPE_GUID_NOT_FOUND(404, "ATLAS4042E", "Given type guid {0} was invalid"), EMPTY_RESULTS(404, "ATLAS4044E", "No result found for {0}"), + INSTANCE_GUID_NOT_FOUND(404, "ATLAS4045E", "Given instance guid {0} is invalid"), + INSTANCE_LINEAGE_INVALID_PARAMS(404, "ATLAS4046E", "Invalid lineage query parameters passed {0}: {1}"), + INSTANCE_LINEAGE_QUERY_FAILED(404, "ATLAS4047E", "Instance lineage query failed {0}"), TYPE_ALREADY_EXISTS(409, "ATLAS4091E", "Given type {0} already exists"), TYPE_HAS_REFERENCES(409, "ATLAS4092E", "Given type {0} has references"), http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea6c3cb5/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java new file mode 100644 index 0000000..61b7f91 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageInfo.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.model.lineage; + +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE; +import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY; + +@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +@XmlRootElement +@XmlAccessorType(XmlAccessType.PROPERTY) +public class AtlasLineageInfo implements Serializable { + private String baseEntityGuid; + private LineageDirection lineageDirection; + private int lineageDepth; + private Map<String, AtlasEntityHeader> guidEntityMap; + private Set<LineageRelation> relations; + + public AtlasLineageInfo() {} + + public enum LineageDirection { INPUT, OUTPUT, BOTH } + + /** + * Captures lineage information for an entity instance like hive_table + + * @param baseEntityGuid guid of the lineage entity . + * @param lineageDirection direction of lineage, can be INPUT, OUTPUT or INPUT_AND_OUTPUT + * @param lineageDepth lineage depth to be fetched. + * @param guidEntityMap map of entity guid to AtlasEntityHeader (minimal entity info) + * @param relations list of lineage relations for the entity (fromEntityId -> toEntityId) + */ + public AtlasLineageInfo(String baseEntityGuid, Map<String, AtlasEntityHeader> guidEntityMap, + Set<LineageRelation> relations, LineageDirection lineageDirection, int lineageDepth) { + this.baseEntityGuid = baseEntityGuid; + this.lineageDirection = lineageDirection; + this.lineageDepth = lineageDepth; + this.guidEntityMap = guidEntityMap; + this.relations = relations; + } + + public String getBaseEntityGuid() { + return baseEntityGuid; + } + + public void setBaseEntityGuid(String baseEntityGuid) { + this.baseEntityGuid = baseEntityGuid; + } + + public Map<String, AtlasEntityHeader> getGuidEntityMap() { + return guidEntityMap; + } + + public void setGuidEntityMap(Map<String, AtlasEntityHeader> guidEntityMap) { + this.guidEntityMap = guidEntityMap; + } + + public Set<LineageRelation> getRelations() { + return relations; + } + + public void setRelations(Set<LineageRelation> relations) { + this.relations = relations; + } + + public LineageDirection getLineageDirection() { + return lineageDirection; + } + + public void setLineageDirection(LineageDirection lineageDirection) { + this.lineageDirection = lineageDirection; + } + + public int getLineageDepth() { + return lineageDepth; + } + + public void setLineageDepth(int lineageDepth) { + this.lineageDepth = lineageDepth; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + AtlasLineageInfo that = (AtlasLineageInfo) o; + + if (baseEntityGuid != null ? !baseEntityGuid.equals(that.baseEntityGuid) : that.baseEntityGuid != null) return false; + if (lineageDepth != that.lineageDepth) return false; + if (guidEntityMap != null ? !guidEntityMap.equals(that.guidEntityMap) : that.guidEntityMap != null) return false; + if (relations != null ? !relations.equals(that.relations) : that.relations != null) return false; + return lineageDirection == that.lineageDirection; + } + + @Override + public int hashCode() { + int result = guidEntityMap != null ? guidEntityMap.hashCode() : 0; + result = 31 * result + (relations != null ? relations.hashCode() : 0); + result = 31 * result + (lineageDirection != null ? lineageDirection.hashCode() : 0); + result = 31 * result + lineageDepth; + result = 31 * result + (baseEntityGuid != null ? baseEntityGuid.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "AtlasLineageInfo{" + + "baseEntityGuid=" + baseEntityGuid + + ", guidEntityMap=" + guidEntityMap + + ", relations=" + relations + + ", lineageDirection=" + lineageDirection + + ", lineageDepth=" + lineageDepth + + '}'; + } + + @JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) + @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) + @JsonIgnoreProperties(ignoreUnknown = true) + @XmlRootElement + @XmlAccessorType(XmlAccessType.PROPERTY) + public static class LineageRelation { + private String fromEntityId; + private String toEntityId; + + public LineageRelation() { } + + public LineageRelation(String fromEntityId, String toEntityId) { + this.fromEntityId = fromEntityId; + this.toEntityId = toEntityId; + } + + public String getFromEntityId() { + return fromEntityId; + } + + public void setFromEntityId(String fromEntityId) { + this.fromEntityId = fromEntityId; + } + + public String getToEntityId() { + return toEntityId; + } + + public void setToEntityId(String toEntityId) { + this.toEntityId = toEntityId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + LineageRelation that = (LineageRelation) o; + + if (fromEntityId != null ? !fromEntityId.equals(that.fromEntityId) : that.fromEntityId != null) + return false; + return toEntityId != null ? toEntityId.equals(that.toEntityId) : that.toEntityId == null; + + } + + @Override + public int hashCode() { + int result = fromEntityId != null ? fromEntityId.hashCode() : 0; + result = 31 * result + (toEntityId != null ? toEntityId.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "LineageRelation{" + + "fromEntityId='" + fromEntityId + '\'' + + ", toEntityId='" + toEntityId + '\'' + + '}'; + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea6c3cb5/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageService.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageService.java b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageService.java new file mode 100644 index 0000000..fc58f58 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/model/lineage/AtlasLineageService.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.model.lineage; + + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection; + +public interface AtlasLineageService { + /** + * @param entityGuid unique ID of the entity + * @param direction direction of lineage - INPUT, OUTPUT or BOTH + * @param depth number of hops in lineage + * @return AtlasLineageInfo + */ + AtlasLineageInfo getAtlasLineageInfo(String entityGuid, LineageDirection direction, int depth) throws AtlasBaseException; + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea6c3cb5/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 651a2d4..acc5734 100644 --- a/release-log.txt +++ b/release-log.txt @@ -9,6 +9,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai) ALL CHANGES: +ATLAS-1234 Lineage REST API - v2 ([email protected] via mneethiraj) ATLAS-1276 fix for webapp test failures (ayubkhan via mneethiraj) ATLAS-1278 Added API to get typedef header info (apoorvnaik via mneethiraj) ATLAS-1192 Atlas IE support (kevalbhatt) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea6c3cb5/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java index aabf269..d3903fb 100755 --- a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java +++ b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java @@ -26,11 +26,13 @@ import com.google.inject.multibindings.Multibinder; import org.aopalliance.intercept.MethodInterceptor; import org.apache.atlas.discovery.DataSetLineageService; import org.apache.atlas.discovery.DiscoveryService; +import org.apache.atlas.discovery.EntityLineageService; import org.apache.atlas.discovery.LineageService; import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService; import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.listener.TypeDefChangeListener; import org.apache.atlas.listener.TypesChangeListener; +import org.apache.atlas.model.lineage.AtlasLineageService; import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.audit.EntityAuditListener; import org.apache.atlas.repository.audit.EntityAuditRepository; @@ -94,6 +96,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule { bind(DiscoveryService.class).to(GraphBackedDiscoveryService.class).asEagerSingleton(); bind(LineageService.class).to(DataSetLineageService.class).asEagerSingleton(); + bind(AtlasLineageService.class).to(EntityLineageService.class).asEagerSingleton(); Configuration configuration = getConfiguration(); bindAuditRepository(binder(), configuration); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea6c3cb5/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java new file mode 100644 index 0000000..14bf143 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.discovery; + + +import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity.Status; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.lineage.AtlasLineageInfo; +import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation; +import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection; +import org.apache.atlas.model.lineage.AtlasLineageService; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graph.AtlasGraphProvider; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.commons.collections.CollectionUtils; + +import javax.inject.Inject; +import javax.script.ScriptException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class EntityLineageService implements AtlasLineageService { + private static final String INPUT_PROCESS_EDGE = "__Process.inputs"; + private static final String OUTPUT_PROCESS_EDGE = "__Process.outputs"; + + private final AtlasGraph graph; + + /** + * Gremlin query to retrieve input/output lineage for specified depth on a DataSet entity. + * return list of Atlas vertices paths. + */ + private static final String PARTIAL_LINEAGE_QUERY = "g.V('__guid', '%s').as('src').in('%s').out('%s')." + + "loop('src', {it.loops <= %s}, {((it.object.'__superTypeNames') ? " + + "(it.object.'__superTypeNames'.contains('DataSet')) : false)})." + + "path().toList()"; + + /** + * Gremlin query to retrieve all (no fixed depth) input/output lineage for a DataSet entity. + * return list of Atlas vertices paths. + */ + private static final String FULL_LINEAGE_QUERY = "g.V('__guid', '%s').as('src').in('%s').out('%s')." + + "loop('src', {((it.path.contains(it.object)) ? false : true)}, " + + "{((it.object.'__superTypeNames') ? " + + "(it.object.'__superTypeNames'.contains('DataSet')) : false)})." + + "path().toList()"; + + @Inject + EntityLineageService() throws DiscoveryException { + this.graph = AtlasGraphProvider.getGraphInstance(); + } + + @Override + public AtlasLineageInfo getAtlasLineageInfo(String guid, LineageDirection direction, int depth) throws AtlasBaseException { + AtlasLineageInfo lineageInfo; + + if (!entityExists(guid)) { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); + } + + if (direction != null) { + if (direction.equals(LineageDirection.INPUT)) { + lineageInfo = getLineageInfo(guid, LineageDirection.INPUT, depth); + } else if (direction.equals(LineageDirection.OUTPUT)) { + lineageInfo = getLineageInfo(guid, LineageDirection.OUTPUT, depth); + } else if (direction.equals(LineageDirection.BOTH)) { + lineageInfo = getBothLineageInfo(guid, depth); + } else { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", direction.toString()); + } + } else { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", null); + } + + return lineageInfo; + } + + private AtlasLineageInfo getLineageInfo(String guid, LineageDirection direction, int depth) throws AtlasBaseException { + Map<String, AtlasEntityHeader> entities = new HashMap<String, AtlasEntityHeader>(); + Set<LineageRelation> relations = new HashSet<LineageRelation>(); + String lineageQuery = getLineageQuery(guid, direction, depth); + + try { + List paths = (List) graph.executeGremlinScript(lineageQuery, true); + + if (CollectionUtils.isNotEmpty(paths)) { + for (Object path : paths) { + if (path instanceof List) { + List vertices = (List) path; + + if (CollectionUtils.isNotEmpty(vertices)) { + AtlasEntityHeader prev = null; + + for (Object vertex : vertices) { + AtlasEntityHeader entity = toAtlasEntityHeader(vertex); + + if (!entities.containsKey(entity.getGuid())) { + entities.put(entity.getGuid(), entity); + } + + if (prev != null) { + if (direction.equals(LineageDirection.INPUT)) { + relations.add(new LineageRelation(entity.getGuid(), prev.getGuid())); + } else if (direction.equals(LineageDirection.OUTPUT)) { + relations.add(new LineageRelation(prev.getGuid(), entity.getGuid())); + } + } + prev = entity; + } + } + } + } + } + + } catch (ScriptException e) { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_QUERY_FAILED, lineageQuery); + } + + return new AtlasLineageInfo(guid, entities, relations, direction, depth); + } + + private AtlasLineageInfo getBothLineageInfo(String guid, int depth) throws AtlasBaseException { + AtlasLineageInfo inputLineage = getLineageInfo(guid, LineageDirection.INPUT, depth); + AtlasLineageInfo outputLineage = getLineageInfo(guid, LineageDirection.OUTPUT, depth); + AtlasLineageInfo ret = inputLineage; + + ret.getRelations().addAll(outputLineage.getRelations()); + ret.getGuidEntityMap().putAll(outputLineage.getGuidEntityMap()); + ret.setLineageDirection(LineageDirection.BOTH); + + return ret; + } + + private String getLineageQuery(String entityGuid, LineageDirection direction, int depth) throws AtlasBaseException { + String lineageQuery = null; + + if (direction.equals(LineageDirection.INPUT)) { + if (depth < 1) { + lineageQuery = String.format(FULL_LINEAGE_QUERY, entityGuid, OUTPUT_PROCESS_EDGE, INPUT_PROCESS_EDGE); + } else { + lineageQuery = String.format(PARTIAL_LINEAGE_QUERY, entityGuid, OUTPUT_PROCESS_EDGE, INPUT_PROCESS_EDGE, depth); + } + + } else if (direction.equals(LineageDirection.OUTPUT)) { + if (depth < 1) { + lineageQuery = String.format(FULL_LINEAGE_QUERY, entityGuid, INPUT_PROCESS_EDGE, OUTPUT_PROCESS_EDGE); + } else { + lineageQuery = String.format(PARTIAL_LINEAGE_QUERY, entityGuid, INPUT_PROCESS_EDGE, OUTPUT_PROCESS_EDGE, depth); + } + } + + return lineageQuery; + } + + private AtlasEntityHeader toAtlasEntityHeader(Object vertexObj) { + AtlasEntityHeader ret = new AtlasEntityHeader(); + + if (vertexObj instanceof AtlasVertex) { + AtlasVertex vertex = (AtlasVertex) vertexObj; + ret.setTypeName(vertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class)); + ret.setGuid(vertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class)); + ret.setDisplayText(vertex.getProperty(Constants.QUALIFIED_NAME, String.class)); + + String state = vertex.getProperty(Constants.STATE_PROPERTY_KEY, String.class); + Status status = (state.equalsIgnoreCase("ACTIVE") ? Status.STATUS_ACTIVE : Status.STATUS_DELETED); + ret.setStatus(status); + } + + return ret; + } + + private boolean entityExists(String guid) { + boolean ret = false; + Iterator<AtlasVertex> results = graph.query() + .has(Constants.GUID_PROPERTY_KEY, guid) + .has(Constants.SUPER_TYPES_PROPERTY_KEY, AtlasClient.DATA_SET_SUPER_TYPE) + .vertices().iterator(); + + while (results.hasNext()) { + return true; + } + + return ret; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea6c3cb5/repository/src/test/java/org/apache/atlas/lineage/EntityLineageServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/lineage/EntityLineageServiceTest.java b/repository/src/test/java/org/apache/atlas/lineage/EntityLineageServiceTest.java new file mode 100644 index 0000000..b1dac9d --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/lineage/EntityLineageServiceTest.java @@ -0,0 +1,347 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.lineage; + +import com.google.common.collect.ImmutableList; +import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.BaseRepositoryTest; +import org.apache.atlas.RepositoryMetadataModule; +import org.apache.atlas.discovery.EntityLineageService; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity.Status; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.lineage.AtlasLineageInfo; +import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation; +import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.commons.collections.ArrayStack; +import org.apache.commons.lang.RandomStringUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import javax.inject.Inject; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.testng.Assert.*; + +/** + * Unit tests for the new v2 Instance LineageService. + */ +@Guice(modules = RepositoryMetadataModule.class) +public class EntityLineageServiceTest extends BaseRepositoryTest { + + @Inject + private EntityLineageService lineageService; + + @BeforeClass + public void setUp() throws Exception { + super.setUp(); + } + + @AfterClass + public void tearDown() throws Exception { + super.tearDown(); + } + + /** + * Circular Lineage Test. + */ + @Test + public void testCircularLineage() throws Exception{ + String entityGuid = getEntityId(HIVE_TABLE_TYPE, "name", "table2"); + AtlasLineageInfo circularLineage = getInputLineageInfo(entityGuid, 5); + + assertNotNull(circularLineage); + System.out.println("circular lineage = " + circularLineage); + + Map<String, AtlasEntityHeader> entities = circularLineage.getGuidEntityMap(); + assertNotNull(entities); + + Set<LineageRelation> relations = circularLineage.getRelations(); + assertNotNull(relations); + + Assert.assertEquals(entities.size(), 4); + Assert.assertEquals(relations.size(), 4); + Assert.assertEquals(circularLineage.getLineageDepth(), 5); + Assert.assertEquals(circularLineage.getLineageDirection(), LineageDirection.INPUT); + + assertTrue(entities.containsKey(circularLineage.getBaseEntityGuid())); + } + + /** + * Input Lineage Tests. + */ + @Test(dataProvider = "invalidQueryParamsProvider") + public void testGetInputLineageInfoInvalidParams(final String guid, final AtlasLineageInfo.LineageDirection direction, final int depth, AtlasErrorCode errorCode) throws Exception { + testInvalidQueryParams(errorCode, new Invoker() { + @Override + void run() throws AtlasBaseException { + lineageService.getAtlasLineageInfo(guid, direction, depth); + } + }); + } + + @Test + public void testGetInputLineageInfo() throws Exception { + String entityGuid = getEntityId(HIVE_TABLE_TYPE, "name", "sales_fact_monthly_mv"); + AtlasLineageInfo inputLineage = getInputLineageInfo(entityGuid, 4); + + assertNotNull(inputLineage); + System.out.println("input lineage = " + inputLineage); + + Map<String, AtlasEntityHeader> entities = inputLineage.getGuidEntityMap(); + assertNotNull(entities); + + Set<LineageRelation> relations = inputLineage.getRelations(); + assertNotNull(relations); + + Assert.assertEquals(entities.size(), 6); + Assert.assertEquals(relations.size(), 5); + Assert.assertEquals(inputLineage.getLineageDepth(), 4); + Assert.assertEquals(inputLineage.getLineageDirection(), LineageDirection.INPUT); + + assertTrue(entities.containsKey(inputLineage.getBaseEntityGuid())); + } + + /** + * Output Lineage Tests. + */ + @Test(dataProvider = "invalidQueryParamsProvider") + public void testGetOutputLineageInvalidParams(final String guid, final LineageDirection direction, final int depth, AtlasErrorCode errorCode) throws Exception { + testInvalidQueryParams(errorCode, new Invoker() { + @Override + void run() throws AtlasBaseException { + lineageService.getAtlasLineageInfo(guid, direction, depth); + } + }); + } + + @Test + public void testGetOutputLineageInfo() throws Exception { + String entityGuid = getEntityId(HIVE_TABLE_TYPE, "name", "sales_fact"); + AtlasLineageInfo outputLineage = getOutputLineageInfo(entityGuid, 4); + + assertNotNull(outputLineage); + System.out.println("output lineage = " + outputLineage); + + Map<String, AtlasEntityHeader> entities = outputLineage.getGuidEntityMap(); + assertNotNull(entities); + + Set<LineageRelation> relations = outputLineage.getRelations(); + assertNotNull(relations); + + Assert.assertEquals(entities.size(), 5); + Assert.assertEquals(relations.size(), 4); + Assert.assertEquals(outputLineage.getLineageDepth(), 4); + Assert.assertEquals(outputLineage.getLineageDirection(), LineageDirection.OUTPUT); + + assertTrue(entities.containsKey(outputLineage.getBaseEntityGuid())); + } + + /** + * Both Lineage Tests. + */ + @Test(dataProvider = "invalidQueryParamsProvider") + public void testGetLineageInfoInvalidParams(final String guid, final LineageDirection direction, final int depth, AtlasErrorCode errorCode) throws Exception { + testInvalidQueryParams(errorCode, new Invoker() { + @Override + void run() throws AtlasBaseException { + lineageService.getAtlasLineageInfo(guid, direction, depth); + } + }); + } + + @Test + public void testGetLineageInfo() throws Exception { + String entityGuid = getEntityId(HIVE_TABLE_TYPE, "name", "sales_fact_monthly_mv"); + AtlasLineageInfo bothLineage = getBothLineageInfo(entityGuid, 5); + + assertNotNull(bothLineage); + System.out.println("both lineage = " + bothLineage); + + Map<String, AtlasEntityHeader> entities = bothLineage.getGuidEntityMap(); + assertNotNull(entities); + + Set<LineageRelation> relations = bothLineage.getRelations(); + assertNotNull(relations); + + Assert.assertEquals(entities.size(), 6); + Assert.assertEquals(relations.size(), 5); + Assert.assertEquals(bothLineage.getLineageDepth(), 5); + Assert.assertEquals(bothLineage.getLineageDirection(), AtlasLineageInfo.LineageDirection.BOTH); + + assertTrue(entities.containsKey(bothLineage.getBaseEntityGuid())); + } + + @DataProvider(name = "invalidQueryParamsProvider") + private Object[][] params() throws Exception { + String entityGuid = getEntityId(HIVE_TABLE_TYPE, "name", "sales_fact_monthly_mv"); + + // String guid, LineageDirection direction, int depth, AtlasErrorCode errorCode + + return new Object[][]{ + {"", null, 0, AtlasErrorCode.INSTANCE_GUID_NOT_FOUND}, + {" ", null, 0, AtlasErrorCode.INSTANCE_GUID_NOT_FOUND}, + {null, null, 0, AtlasErrorCode.INSTANCE_GUID_NOT_FOUND}, + {"invalidGuid", LineageDirection.OUTPUT, 6, AtlasErrorCode.INSTANCE_GUID_NOT_FOUND}, + {entityGuid, null, -10, AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS}, + {entityGuid, null, 5, AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS} + }; + } + + abstract class Invoker { + abstract void run() throws AtlasBaseException; + } + + public void testInvalidQueryParams(AtlasErrorCode expectedErrorCode, Invoker Invoker) throws Exception { + try { + Invoker.run(); + fail("Expected " + expectedErrorCode.toString()); + } catch(AtlasBaseException e) { + assertEquals(e.getAtlasErrorCode(), expectedErrorCode); + } + } + + private AtlasLineageInfo getInputLineageInfo(String guid, int depth) throws Exception { + return lineageService.getAtlasLineageInfo(guid, LineageDirection.INPUT, depth); + } + + private AtlasLineageInfo getOutputLineageInfo(String guid, int depth) throws Exception { + return lineageService.getAtlasLineageInfo(guid, AtlasLineageInfo.LineageDirection.OUTPUT, depth); + } + + private AtlasLineageInfo getBothLineageInfo(String guid, int depth) throws Exception { + return lineageService.getAtlasLineageInfo(guid, AtlasLineageInfo.LineageDirection.BOTH, depth); + } + + @Test + public void testNewLineageWithDelete() throws Exception { + String tableName = "table" + random(); + createTable(tableName, 3, true); + String entityGuid = getEntityId(HIVE_TABLE_TYPE, "name", tableName); + + AtlasLineageInfo inputLineage = getInputLineageInfo(entityGuid, 5); + assertNotNull(inputLineage); + System.out.println("input lineage = " + inputLineage); + + Map<String, AtlasEntityHeader> entitiesInput = inputLineage.getGuidEntityMap(); + assertNotNull(entitiesInput); + assertEquals(entitiesInput.size(), 3); + + Set<LineageRelation> relationsInput = inputLineage.getRelations(); + assertNotNull(relationsInput); + assertEquals(relationsInput.size(), 2); + + AtlasEntityHeader tableEntityInput = entitiesInput.get(entityGuid); + assertEquals(tableEntityInput.getStatus(), Status.STATUS_ACTIVE); + + AtlasLineageInfo outputLineage = getOutputLineageInfo(entityGuid, 5); + assertNotNull(outputLineage); + System.out.println("output lineage = " + outputLineage); + + Map<String, AtlasEntityHeader> entitiesOutput = outputLineage.getGuidEntityMap(); + assertNotNull(entitiesOutput); + assertEquals(entitiesOutput.size(), 3); + + Set<LineageRelation> relationsOutput = outputLineage.getRelations(); + assertNotNull(relationsOutput); + assertEquals(relationsOutput.size(), 2); + + AtlasEntityHeader tableEntityOutput = entitiesOutput.get(entityGuid); + assertEquals(tableEntityOutput.getStatus(), Status.STATUS_ACTIVE); + + AtlasLineageInfo bothLineage = getBothLineageInfo(entityGuid, 5); + assertNotNull(bothLineage); + System.out.println("both lineage = " + bothLineage); + + Map<String, AtlasEntityHeader> entitiesBoth = bothLineage.getGuidEntityMap(); + assertNotNull(entitiesBoth); + assertEquals(entitiesBoth.size(), 5); + + Set<LineageRelation> relationsBoth = bothLineage.getRelations(); + assertNotNull(relationsBoth); + assertEquals(relationsBoth.size(), 4); + + AtlasEntityHeader tableEntityBoth = entitiesBoth.get(entityGuid); + assertEquals(tableEntityBoth.getStatus(), Status.STATUS_ACTIVE); + + //Delete the table entity. Lineage for entity returns the same results as before. + //Lineage for table name throws EntityNotFoundException + AtlasClient.EntityResult deleteResult = repository.deleteEntities(Arrays.asList(entityGuid)); + assertTrue(deleteResult.getDeletedEntities().contains(entityGuid)); + + inputLineage = getInputLineageInfo(entityGuid, 5); + tableEntityInput = inputLineage.getGuidEntityMap().get(entityGuid); + assertEquals(tableEntityInput.getStatus(), Status.STATUS_DELETED); + assertEquals(inputLineage.getGuidEntityMap().size(), 3); + + outputLineage = getOutputLineageInfo(entityGuid, 5); + tableEntityOutput = outputLineage.getGuidEntityMap().get(entityGuid); + assertEquals(tableEntityOutput.getStatus(), Status.STATUS_DELETED); + assertEquals(outputLineage.getGuidEntityMap().size(), 3); + + bothLineage = getBothLineageInfo(entityGuid, 5); + tableEntityBoth = bothLineage.getGuidEntityMap().get(entityGuid); + assertEquals(tableEntityBoth.getStatus(), Status.STATUS_DELETED); + assertEquals(bothLineage.getGuidEntityMap().size(), 5); + + } + + private void createTable(String tableName, int numCols, boolean createLineage) throws Exception { + String dbId = getEntityId(DATABASE_TYPE, "name", "Sales"); + Id salesDB = new Id(dbId, 0, DATABASE_TYPE); + + //Create the entity again and schema should return the new schema + List<Referenceable> columns = new ArrayStack(); + for (int i = 0; i < numCols; i++) { + columns.add(column("col" + random(), "int", "column descr")); + } + + Referenceable sd = + storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, + ImmutableList.of(column("time_id", "int", "time id"))); + + Id table = table(tableName, "test table", salesDB, sd, "fetl", "External", columns); + if (createLineage) { + Id inTable = table("table" + random(), "test table", salesDB, sd, "fetl", "External", columns); + Id outTable = table("table" + random(), "test table", salesDB, sd, "fetl", "External", columns); + loadProcess("process" + random(), "hive query for monthly summary", "Tim ETL", ImmutableList.of(inTable), + ImmutableList.of(table), "create table as select ", "plan", "id", "graph", "ETL"); + loadProcess("process" + random(), "hive query for monthly summary", "Tim ETL", ImmutableList.of(table), + ImmutableList.of(outTable), "create table as select ", "plan", "id", "graph", "ETL"); + } + } + + private String random() { + return RandomStringUtils.randomAlphanumeric(5); + } + + private String getEntityId(String typeName, String attributeName, String attributeValue) throws Exception { + return repository.getEntityDefinition(typeName, attributeName, attributeValue).getId()._getId(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea6c3cb5/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java new file mode 100644 index 0000000..effd29f --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/rest/LineageREST.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.rest; + + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.lineage.AtlasLineageInfo; +import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection; +import org.apache.atlas.model.lineage.AtlasLineageService; +import org.apache.atlas.web.util.Servlets; + +import javax.inject.Inject; +import javax.inject.Singleton; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; + +@Path("v2/lineage") +@Singleton +public class LineageREST { + private final AtlasLineageService atlasLineageService; + private static final String DEFAULT_DIRECTION = "BOTH"; + private static final String DEFAULT_DEPTH = "3"; + + @Context + private HttpServletRequest httpServletRequest; + + @Inject + public LineageREST(AtlasLineageService atlasLineageService) { + this.atlasLineageService = atlasLineageService; + } + + /** + * Returns lineage info about entity. + * @param guid - unique entity id + * @param direction - input, output or both + * @param depth - number of hops for lineage + * @return AtlasLineageInfo + * @throws AtlasBaseException + */ + @GET + @Path("/{guid}") + @Consumes(Servlets.JSON_MEDIA_TYPE) + @Produces(Servlets.JSON_MEDIA_TYPE) + public AtlasLineageInfo getLineageGraph(@PathParam("guid") String guid, + @QueryParam("direction") @DefaultValue(DEFAULT_DIRECTION) LineageDirection direction, + @QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) throws AtlasBaseException { + + AtlasLineageInfo ret = atlasLineageService.getAtlasLineageInfo(guid, direction, depth); + + return ret; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea6c3cb5/webapp/src/test/java/org/apache/atlas/web/resources/EntityLineageJerseyResourceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/EntityLineageJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/EntityLineageJerseyResourceIT.java new file mode 100644 index 0000000..f0455c0 --- /dev/null +++ b/webapp/src/test/java/org/apache/atlas/web/resources/EntityLineageJerseyResourceIT.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.resources; + +import com.google.common.collect.ImmutableList; +import com.google.gson.Gson; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import org.apache.atlas.AtlasClient; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.lineage.AtlasLineageInfo; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.web.util.Servlets; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.Response; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Entity Lineage v2 Integration Tests. + */ +public class EntityLineageJerseyResourceIT extends BaseResourceIT { + private static final String BASE_URI = "api/atlas/v2/lineage/"; + private static final String INPUT_DIRECTION = "INPUT"; + private static final String OUTPUT_DIRECTION = "OUTPUT"; + private static final String BOTH_DIRECTION = "BOTH"; + private static final String DIRECTION_PARAM = "direction"; + private static final String DEPTH_PARAM = "depth"; + + private String salesFactTable; + private String salesMonthlyTable; + private String salesDBName; + Gson gson = new Gson(); + + @BeforeClass + public void setUp() throws Exception { + super.setUp(); + + createTypeDefinitions(); + setupInstances(); + } + + @Test + public void testInputLineageInfo() throws Exception { + String tableId = serviceClient.getEntity(HIVE_TABLE_TYPE, + AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, salesMonthlyTable).getId()._getId(); + WebResource resource = service.path(BASE_URI).path(tableId).queryParam(DIRECTION_PARAM, INPUT_DIRECTION). + queryParam(DEPTH_PARAM, "5"); + + ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE) + .method(HttpMethod.GET, ClientResponse.class); + Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode()); + + String responseAsString = clientResponse.getEntity(String.class); + Assert.assertNotNull(responseAsString); + System.out.println("input lineage info = " + responseAsString); + + AtlasLineageInfo inputLineageInfo = gson.fromJson(responseAsString, AtlasLineageInfo.class); + + Map<String, AtlasEntityHeader> entities = inputLineageInfo.getGuidEntityMap(); + Assert.assertNotNull(entities); + + Set<AtlasLineageInfo.LineageRelation> relations = inputLineageInfo.getRelations(); + Assert.assertNotNull(relations); + + Assert.assertEquals(entities.size(), 6); + Assert.assertEquals(relations.size(), 5); + Assert.assertEquals(inputLineageInfo.getLineageDirection(), AtlasLineageInfo.LineageDirection.INPUT); + Assert.assertEquals(inputLineageInfo.getLineageDepth(), 5); + Assert.assertEquals(inputLineageInfo.getBaseEntityGuid(), tableId); + } + + @Test + public void testOutputLineageInfo() throws Exception { + String tableId = serviceClient.getEntity(HIVE_TABLE_TYPE, + AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, salesFactTable).getId()._getId(); + WebResource resource = service.path(BASE_URI).path(tableId).queryParam(DIRECTION_PARAM, OUTPUT_DIRECTION). + queryParam(DEPTH_PARAM, "5"); + + ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE) + .method(HttpMethod.GET, ClientResponse.class); + Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode()); + + String responseAsString = clientResponse.getEntity(String.class); + Assert.assertNotNull(responseAsString); + System.out.println("output lineage info = " + responseAsString); + + AtlasLineageInfo outputLineageInfo = gson.fromJson(responseAsString, AtlasLineageInfo.class); + + Map<String, AtlasEntityHeader> entities = outputLineageInfo.getGuidEntityMap(); + Assert.assertNotNull(entities); + + Set<AtlasLineageInfo.LineageRelation> relations = outputLineageInfo.getRelations(); + Assert.assertNotNull(relations); + + Assert.assertEquals(entities.size(), 5); + Assert.assertEquals(relations.size(), 4); + Assert.assertEquals(outputLineageInfo.getLineageDirection(), AtlasLineageInfo.LineageDirection.OUTPUT); + Assert.assertEquals(outputLineageInfo.getLineageDepth(), 5); + Assert.assertEquals(outputLineageInfo.getBaseEntityGuid(), tableId); + } + + @Test + public void testLineageInfo() throws Exception { + String tableId = serviceClient.getEntity(HIVE_TABLE_TYPE, + AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, salesMonthlyTable).getId()._getId(); + WebResource resource = service.path(BASE_URI).path(tableId).queryParam(DIRECTION_PARAM, BOTH_DIRECTION). + queryParam(DEPTH_PARAM, "5"); + + ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE) + .method(HttpMethod.GET, ClientResponse.class); + Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode()); + + String responseAsString = clientResponse.getEntity(String.class); + Assert.assertNotNull(responseAsString); + System.out.println("both lineage info = " + responseAsString); + + AtlasLineageInfo bothLineageInfo = gson.fromJson(responseAsString, AtlasLineageInfo.class); + + Map<String, AtlasEntityHeader> entities = bothLineageInfo.getGuidEntityMap(); + Assert.assertNotNull(entities); + + Set<AtlasLineageInfo.LineageRelation> relations = bothLineageInfo.getRelations(); + Assert.assertNotNull(relations); + + Assert.assertEquals(entities.size(), 6); + Assert.assertEquals(relations.size(), 5); + Assert.assertEquals(bothLineageInfo.getLineageDirection(), AtlasLineageInfo.LineageDirection.BOTH); + Assert.assertEquals(bothLineageInfo.getLineageDepth(), 5); + Assert.assertEquals(bothLineageInfo.getBaseEntityGuid(), tableId); + } + + private void setupInstances() throws Exception { + salesDBName = "Sales" + randomString(); + Id salesDB = database(salesDBName, "Sales Database", "John ETL", "hdfs://host:8000/apps/warehouse/sales"); + + List<Referenceable> salesFactColumns = ImmutableList + .of(column("time_id", "int", "time id"), column("product_id", "int", "product id"), + column("customer_id", "int", "customer id"), + column("sales", "double", "product id")); + + salesFactTable = "sales_fact" + randomString(); + Id salesFact = table(salesFactTable, "sales fact table", salesDB, "Joe", "MANAGED", salesFactColumns); + + List<Referenceable> timeDimColumns = ImmutableList + .of(column("time_id", "int", "time id"), column("dayOfYear", "int", "day Of Year"), + column("weekDay", "int", "week Day")); + + Id timeDim = + table("time_dim" + randomString(), "time dimension table", salesDB, "John Doe", "EXTERNAL", + timeDimColumns); + + Id reportingDB = + database("Reporting" + randomString(), "reporting database", "Jane BI", + "hdfs://host:8000/apps/warehouse/reporting"); + + Id salesFactDaily = + table("sales_fact_daily_mv" + randomString(), "sales fact daily materialized view", reportingDB, + "Joe BI", "MANAGED", salesFactColumns); + + loadProcess("loadSalesDaily" + randomString(), "John ETL", ImmutableList.of(salesFact, timeDim), + ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph"); + + salesMonthlyTable = "sales_fact_monthly_mv" + randomString(); + Id salesFactMonthly = + table(salesMonthlyTable, "sales fact monthly materialized view", reportingDB, "Jane BI", + "MANAGED", salesFactColumns); + + loadProcess("loadSalesMonthly" + randomString(), "John ETL", ImmutableList.of(salesFactDaily), + ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph"); + } + + Id database(String name, String description, String owner, String locationUri, String... traitNames) + throws Exception { + Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames); + referenceable.set("name", name); + referenceable.set("description", description); + referenceable.set("owner", owner); + referenceable.set("locationUri", locationUri); + referenceable.set("createTime", System.currentTimeMillis()); + + return createInstance(referenceable); + } + + Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception { + Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames); + referenceable.set("name", name); + referenceable.set("dataType", dataType); + referenceable.set("comment", comment); + + return referenceable; + } + + Id table(String name, String description, Id dbId, String owner, String tableType, List<Referenceable> columns, + String... traitNames) throws Exception { + Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames); + referenceable.set("name", name); + referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); + referenceable.set("description", description); + referenceable.set("owner", owner); + referenceable.set("tableType", tableType); + referenceable.set("createTime", System.currentTimeMillis()); + referenceable.set("lastAccessTime", System.currentTimeMillis()); + referenceable.set("retention", System.currentTimeMillis()); + + referenceable.set("db", dbId); + referenceable.set("columns", columns); + + return createInstance(referenceable); + } + + Id loadProcess(String name, String user, List<Id> inputTables, List<Id> outputTables, String queryText, + String queryPlan, String queryId, String queryGraph, String... traitNames) throws Exception { + Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames); + referenceable.set("name", name); + referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); + referenceable.set("user", user); + referenceable.set("startTime", System.currentTimeMillis()); + referenceable.set("endTime", System.currentTimeMillis() + 10000); + + referenceable.set("inputs", inputTables); + referenceable.set("outputs", outputTables); + + referenceable.set("queryText", queryText); + referenceable.set("queryPlan", queryPlan); + referenceable.set("queryId", queryId); + referenceable.set("queryGraph", queryGraph); + + return createInstance(referenceable); + } +}
