Repository: incubator-atlas Updated Branches: refs/heads/master 7a1b8c15f -> eec1201c9
LineageResource API needs to map to the new LineageREST API Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/eec1201c Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/eec1201c Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/eec1201c Branch: refs/heads/master Commit: eec1201c9ff604b0697af2def66ade6d02ab0603 Parents: 7a1b8c1 Author: Sarath Subramanian <[email protected]> Authored: Tue Nov 29 15:32:16 2016 +0530 Committer: Vimal Sharma <[email protected]> Committed: Tue Nov 29 15:36:23 2016 +0530 ---------------------------------------------------------------------- release-log.txt | 1 + .../atlas/web/resources/LineageResource.java | 46 +++---- .../org/apache/atlas/web/util/LineageUtils.java | 134 +++++++++++++++++++ 3 files changed, 158 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/eec1201c/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index ae5c7dc..103b929 100644 --- a/release-log.txt +++ b/release-log.txt @@ -9,6 +9,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai) ALL CHANGES: +ATLAS-1300 LineageResource API needs to map to the new LineageREST API ([email protected] via svimal2106) ATLAS-1321 fixed HiveHookIT failures (ayubpathan via mneethiraj) ATLAS-1336 fixed StormHookIT (ayubpathan via mneethiraj) ATLAS-1335 multi-value attribute handling in AtlasStructType to be consistent with TypeSystem for backward compatibility (mneethiraj) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/eec1201c/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java index 983bbb8..95fce52 100644 --- a/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java @@ -22,10 +22,17 @@ import org.apache.atlas.AtlasClient; import org.apache.atlas.aspect.Monitored; import org.apache.atlas.discovery.DiscoveryException; import org.apache.atlas.discovery.LineageService; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.lineage.AtlasLineageInfo; +import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection; +import org.apache.atlas.model.lineage.AtlasLineageService; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.exception.EntityNotFoundException; import org.apache.atlas.typesystem.exception.SchemaNotFoundException; import org.apache.atlas.utils.AtlasPerfTracer; +import org.apache.atlas.web.util.LineageUtils; import org.apache.atlas.web.util.Servlets; +import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,9 +51,10 @@ import javax.ws.rs.core.Response; @Singleton public class LineageResource { private static final Logger LOG = LoggerFactory.getLogger(DataSetLineageResource.class); - private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("rest.LineageResource"); - private final LineageService lineageService; + private final AtlasLineageService atlasLineageService; + private final LineageService lineageService; + private final AtlasTypeRegistry typeRegistry; /** * Created by the Guice ServletModule and injected with the @@ -55,8 +63,10 @@ public class LineageResource { * @param lineageService lineage service handle */ @Inject - public LineageResource(LineageService lineageService) { - this.lineageService = lineageService; + public LineageResource(LineageService lineageService, AtlasLineageService atlasLineageService, AtlasTypeRegistry typeRegistry) { + this.lineageService = lineageService; + this.atlasLineageService = atlasLineageService; + this.typeRegistry = typeRegistry; } /** @@ -73,20 +83,15 @@ public class LineageResource { LOG.info("Fetching lineage inputs graph for guid={}", guid); try { - final String jsonResult = lineageService.getInputsGraphForEntity(guid); + AtlasLineageInfo lineageInfo = atlasLineageService.getAtlasLineageInfo(guid, LineageDirection.INPUT, -1); + final String result = LineageUtils.toLineageStruct(lineageInfo, typeRegistry); JSONObject response = new JSONObject(); response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); - response.put(AtlasClient.RESULTS, new JSONObject(jsonResult)); + response.put(AtlasClient.RESULTS, new JSONObject(result)); return Response.ok(response).build(); - } catch (EntityNotFoundException e) { - LOG.error("entity not found for guid={}", guid); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND)); - } catch (DiscoveryException | IllegalArgumentException e) { - LOG.error("Unable to get lineage inputs graph for entity guid={}", guid, e); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); - } catch (Throwable e) { + } catch (AtlasBaseException | JSONException e) { LOG.error("Unable to get lineage inputs graph for entity guid={}", guid, e); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR)); } @@ -106,20 +111,15 @@ public class LineageResource { LOG.info("Fetching lineage outputs graph for entity guid={}", guid); try { - final String jsonResult = lineageService.getOutputsGraphForEntity(guid); + AtlasLineageInfo lineageInfo = atlasLineageService.getAtlasLineageInfo(guid, LineageDirection.OUTPUT, -1); + final String result = LineageUtils.toLineageStruct(lineageInfo, typeRegistry); JSONObject response = new JSONObject(); response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); - response.put(AtlasClient.RESULTS, new JSONObject(jsonResult)); + response.put(AtlasClient.RESULTS, new JSONObject(result)); return Response.ok(response).build(); - } catch (EntityNotFoundException e) { - LOG.error("table entity not found for {}", guid); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND)); - } catch (DiscoveryException | IllegalArgumentException e) { - LOG.error("Unable to get lineage outputs graph for entity guid={}", guid, e); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); - } catch (Throwable e) { + } catch (AtlasBaseException | JSONException e) { LOG.error("Unable to get lineage outputs graph for entity guid={}", guid, e); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR)); } @@ -160,4 +160,4 @@ public class LineageResource { throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR)); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/eec1201c/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java b/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java new file mode 100644 index 0000000..54ca236 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.web.util; + +import org.apache.atlas.AtlasClient; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.lineage.AtlasLineageInfo; +import org.apache.atlas.model.typedef.AtlasBaseTypeDef; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.typesystem.Struct; +import org.apache.atlas.typesystem.json.InstanceSerialization; +import org.apache.atlas.typesystem.types.TypeSystem; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_PREFIX; +import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_SUFFIX; + +public final class LineageUtils { + private LineageUtils() {} + + private static final String VERTICES_ATTR_NAME = "vertices"; + private static final String EDGES_ATTR_NAME = "edges"; + private static final String VERTEX_ID_ATTR_NAME = "vertexId"; + private static final String TEMP_STRUCT_ID_RESULT = "__IdType"; + + private static final AtomicInteger COUNTER = new AtomicInteger(); + + public static String toLineageStruct(AtlasLineageInfo lineageInfo, AtlasTypeRegistry registry) throws AtlasBaseException { + String ret = null; + + if (lineageInfo != null) { + Map<String, AtlasEntityHeader> entities = lineageInfo.getGuidEntityMap(); + Set<AtlasLineageInfo.LineageRelation> relations = lineageInfo.getRelations(); + AtlasLineageInfo.LineageDirection direction = lineageInfo.getLineageDirection(); + Map<String, Struct> verticesMap = new HashMap<>(); + + // Lineage Entities mapping -> verticesMap (vertices) + for (String guid : entities.keySet()) { + AtlasEntityHeader entityHeader = entities.get(guid); + + if (isDataSet(entityHeader.getTypeName(), registry)) { + Map<String, Object> vertexIdMap = new HashMap<>(); + TypeSystem.IdType idType = TypeSystem.getInstance().getIdType(); + + vertexIdMap.put(idType.idAttrName(), guid); + vertexIdMap.put(idType.stateAttrName(), (entityHeader.getStatus() == AtlasEntity.Status.STATUS_ACTIVE) ? "ACTIVE" : "DELETED"); + vertexIdMap.put(idType.typeNameAttrName(), entityHeader.getTypeName()); + + Map<String, Object> values = new HashMap<>(); + values.put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, entityHeader.getDisplayText()); + values.put(VERTEX_ID_ATTR_NAME, constructResultStruct(vertexIdMap, true)); + values.put(AtlasClient.NAME, entityHeader.getDisplayText()); + verticesMap.put(guid, constructResultStruct(values, false)); + } + } + + // Lineage Relations mapping -> edgesMap (edges) + Map<String, List<String>> edgesMap = new HashMap<>(); + + for (AtlasLineageInfo.LineageRelation relation : relations) { + String fromEntityId = relation.getFromEntityId(); + String toEntityId = relation.getToEntityId(); + + if (direction == AtlasLineageInfo.LineageDirection.INPUT) { + if (!edgesMap.containsKey(toEntityId)) { + edgesMap.put(toEntityId, new ArrayList<String>()); + } + edgesMap.get(toEntityId).add(fromEntityId); + + } else if (direction == AtlasLineageInfo.LineageDirection.OUTPUT) { + if (!edgesMap.containsKey(fromEntityId)) { + edgesMap.put(fromEntityId, new ArrayList<String>()); + } + edgesMap.get(fromEntityId).add(toEntityId); + } + } + + Map<String, Object> map = new HashMap<>(); + map.put(VERTICES_ATTR_NAME, verticesMap); + map.put(EDGES_ATTR_NAME, edgesMap); + + ret = InstanceSerialization.toJson(constructResultStruct(map, false), false); + } + + return ret; + } + + private static Struct constructResultStruct(Map<String, Object> values, boolean idType) { + if (idType) { + return new Struct(TEMP_STRUCT_ID_RESULT, values); + } + + return new Struct(org.apache.atlas.query.TypeUtils.TEMP_STRUCT_NAME_PREFIX() + COUNTER.getAndIncrement(), values); + } + + private static boolean isDataSet(String typeName, AtlasTypeRegistry registry) throws AtlasBaseException { + boolean ret = false; + AtlasType type = registry.getType(typeName); + + if (type instanceof AtlasEntityType) { + AtlasEntityType entityType = (AtlasEntityType) type; + ret = entityType.getAllSuperTypes().contains(AtlasBaseTypeDef.ATLAS_TYPE_DATASET); + } + + return ret; + } + +}
