ATLAS-1114: Performance improvements for create/update entity (2 of 2)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/89f70609 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/89f70609 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/89f70609 Branch: refs/heads/master Commit: 89f70609645fa53cb5492035c49c2aa4b99031e9 Parents: 6cd6811 Author: Jeff Hagelberg <[email protected]> Authored: Thu Jan 26 16:03:24 2017 -0500 Committer: Jeff Hagelberg <[email protected]> Committed: Thu Jan 26 16:04:51 2017 -0500 ---------------------------------------------------------------------- .../audit/InMemoryEntityAuditRepository.java | 14 +- .../atlas/repository/graph/DeleteHandler.java | 3 +- .../atlas/repository/graph/FullTextMapper.java | 22 +- .../graph/GraphBackedMetadataRepository.java | 55 +++-- .../atlas/repository/graph/GraphHelper.java | 209 ++++++++++++++-- .../graph/TypedInstanceToGraphMapper.java | 247 ++++++++++++------- .../repository/graph/VertexLookupContext.java | 166 +++++++++++++ .../atlas/services/DefaultMetadataService.java | 4 + .../apache/atlas/util/AttributeValueMap.java | 60 +++++ .../org/apache/atlas/util/IndexedInstance.java | 62 +++++ .../atlas/repository/graph/GraphHelperTest.java | 106 ++++++-- .../test/resources/atlas-application.properties | 1 + .../atlas/web/resources/EntityResource.java | 2 +- .../notification/EntityNotificationIT.java | 26 +- .../NotificationHookConsumerIT.java | 42 ++-- .../atlas/web/resources/BaseResourceIT.java | 77 ++++-- .../DataSetLineageJerseyResourceIT.java | 6 +- .../EntityDiscoveryJerseyResourceIT.java | 14 +- .../web/resources/EntityJerseyResourceIT.java | 79 +++--- .../web/resources/EntityV2JerseyResourceIT.java | 151 ++++++------ .../MetadataDiscoveryJerseyResourceIT.java | 8 +- 21 files changed, 997 insertions(+), 357 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/89f70609/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java index 50a007b..86f1af7 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java @@ -18,17 +18,17 @@ package org.apache.atlas.repository.audit; -import org.apache.atlas.AtlasException; -import org.apache.atlas.EntityAuditEvent; - -import com.google.inject.Singleton; - import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.SortedMap; import java.util.TreeMap; +import org.apache.atlas.AtlasException; +import org.apache.atlas.EntityAuditEvent; + +import com.google.inject.Singleton; + /** * Entity audit repository where audit events are stored in-memory. Used only for integration tests */ @@ -50,8 +50,10 @@ public class InMemoryEntityAuditRepository implements EntityAuditRepository { } } + //synchronized to avoid concurrent modification exception that occurs if events are added + //while we are iterating through the map @Override - public List<EntityAuditEvent> listEvents(String entityId, String startKey, short maxResults) + public synchronized List<EntityAuditEvent> listEvents(String entityId, String startKey, short maxResults) throws AtlasException { List<EntityAuditEvent> events = new ArrayList<>(); String myStartKey = startKey; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/89f70609/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java b/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java index 9eb086f..4973a33 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java @@ -22,6 +22,7 @@ import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX; import static org.apache.atlas.repository.graph.GraphHelper.string; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -70,7 +71,7 @@ public abstract class DeleteHandler { * @param instanceVertices * @throws AtlasException */ - public void deleteEntities(List<AtlasVertex> instanceVertices) throws AtlasException { + public void deleteEntities(Collection<AtlasVertex> instanceVertices) throws AtlasException { RequestContext requestContext = RequestContext.get(); Set<AtlasVertex> deletionCandidateVertices = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/89f70609/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapper.java b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapper.java index 911b1ad..5be8d0b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapper.java @@ -17,10 +17,15 @@ */ package org.apache.atlas.repository.graph; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.atlas.AtlasException; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.typesystem.ITypedInstance; import org.apache.atlas.typesystem.ITypedReferenceableInstance; +import org.apache.atlas.typesystem.persistence.Id; import org.apache.atlas.typesystem.types.AttributeInfo; import org.apache.atlas.typesystem.types.DataTypes; import org.apache.atlas.typesystem.types.EnumValue; @@ -29,23 +34,22 @@ import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - public class FullTextMapper { private static final Logger LOG = LoggerFactory.getLogger(FullTextMapper.class); private final GraphToTypedInstanceMapper graphToTypedInstanceMapper; + private final TypedInstanceToGraphMapper typedInstanceToGraphMapper; private static final GraphHelper graphHelper = GraphHelper.getInstance(); private static final String FULL_TEXT_DELIMITER = " "; private final Map<String, ITypedReferenceableInstance> instanceCache; - FullTextMapper(GraphToTypedInstanceMapper graphToTypedInstanceMapper) { + FullTextMapper(TypedInstanceToGraphMapper typedInstanceToGraphMapper, + GraphToTypedInstanceMapper graphToTypedInstanceMapper) { this.graphToTypedInstanceMapper = graphToTypedInstanceMapper; + this.typedInstanceToGraphMapper = typedInstanceToGraphMapper; instanceCache = new HashMap<>(); } @@ -126,8 +130,12 @@ public class FullTextMapper { case CLASS: if (followReferences) { - String refGuid = ((ITypedReferenceableInstance) value).getId()._getId(); - AtlasVertex refVertex = graphHelper.getVertexForGUID(refGuid); + Id refId = ((ITypedReferenceableInstance) value).getId(); + String refGuid = refId._getId(); + AtlasVertex refVertex = typedInstanceToGraphMapper.lookupVertex(refId); + if(refVertex == null) { + refVertex = graphHelper.getVertexForGUID(refGuid); + } return mapRecursive(refVertex, false); } break; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/89f70609/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java index 0c80aed..be02891 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java @@ -19,9 +19,11 @@ package org.apache.atlas.repository.graph; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; @@ -306,7 +308,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { if (LOG.isDebugEnabled()) { LOG.debug("Deleting trait={} from entity={}", traitNameToBeDeleted, guid); } - + AtlasVertex instanceVertex = graphHelper.getVertexForGUID(guid); List<String> traitNames = GraphHelper.getTraitNames(instanceVertex); @@ -331,7 +333,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { } } - + private void updateTraits(AtlasVertex instanceVertex, List<String> traitNames) { // remove the key instanceVertex.removeProperty(Constants.TRAIT_NAMES_PROPERTY_KEY); @@ -357,8 +359,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.UPDATE_FULL, entitiesUpdated); RequestContext requestContext = RequestContext.get(); - return new AtlasClient.EntityResult(requestContext.getCreatedEntityIds(), - requestContext.getUpdatedEntityIds(), requestContext.getDeletedEntityIds()); + return createEntityResultFromContext(requestContext); } catch (AtlasException e) { throw new RepositoryException(e); } @@ -375,13 +376,14 @@ public class GraphBackedMetadataRepository implements MetadataRepository { TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper, deleteHandler); instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.UPDATE_PARTIAL, entity); RequestContext requestContext = RequestContext.get(); - return new AtlasClient.EntityResult(requestContext.getCreatedEntityIds(), - requestContext.getUpdatedEntityIds(), requestContext.getDeletedEntityIds()); + return createEntityResultFromContext(requestContext); } catch (AtlasException e) { throw new RepositoryException(e); } } + + @Override @GraphTransaction public AtlasClient.EntityResult deleteEntities(List<String> guids) throws RepositoryException { @@ -390,32 +392,41 @@ public class GraphBackedMetadataRepository implements MetadataRepository { throw new IllegalArgumentException("guids must be non-null and non-empty"); } - List<AtlasVertex> vertices = new ArrayList<>(guids.size()); - for (String guid : guids) { - if (guid == null) { - LOG.warn("deleteEntities: Ignoring null guid"); - continue; - } - try { - AtlasVertex instanceVertex = graphHelper.getVertexForGUID(guid); - vertices.add(instanceVertex); - } catch (EntityNotFoundException e) { - // Entity does not exist - treat as non-error, since the caller - // wanted to delete the entity and it's already gone. - LOG.info("Deletion request ignored for non-existent entity with guid {}", guid); + // Retrieve vertices for requested guids. + Map<String, AtlasVertex> vertices = graphHelper.getVerticesForGUIDs(guids); + Collection<AtlasVertex> deletionCandidates = vertices.values(); + + if(LOG.isDebugEnabled()) { + for(String guid : guids) { + if(! vertices.containsKey(guid)) { + // Entity does not exist - treat as non-error, since the caller + // wanted to delete the entity and it's already gone. + LOG.debug("Deletion request ignored for non-existent entity with guid " + guid); + } } } + if (deletionCandidates.isEmpty()) { + LOG.info("No deletion candidate entities were found for guids %s", guids); + return new AtlasClient.EntityResult(Collections.<String>emptyList(), Collections.<String>emptyList(), Collections.<String>emptyList()); + } + try { - deleteHandler.deleteEntities(vertices); + deleteHandler.deleteEntities(deletionCandidates); } catch (AtlasException e) { throw new RepositoryException(e); } RequestContext requestContext = RequestContext.get(); - return new AtlasClient.EntityResult(requestContext.getCreatedEntityIds(), - requestContext.getUpdatedEntityIds(), requestContext.getDeletedEntityIds()); + return createEntityResultFromContext(requestContext); + } + + private AtlasClient.EntityResult createEntityResultFromContext(RequestContext requestContext) { + return new AtlasClient.EntityResult( + requestContext.getCreatedEntityIds(), + requestContext.getUpdatedEntityIds(), + requestContext.getDeletedEntityIds()); } public AtlasGraph getGraph() { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/89f70609/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index c4bb61b..ce8d4c7 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -18,9 +18,20 @@ package org.apache.atlas.repository.graph; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.Stack; +import java.util.UUID; + import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContext; @@ -51,13 +62,17 @@ import org.apache.atlas.typesystem.types.Multiplicity; import org.apache.atlas.typesystem.types.TypeSystem; import org.apache.atlas.typesystem.types.ValueConversionException; import org.apache.atlas.typesystem.types.utils.TypesUtil; +import org.apache.atlas.util.AttributeValueMap; +import org.apache.atlas.util.IndexedInstance; import org.apache.atlas.utils.ParamChecker; import org.apache.commons.lang.StringUtils; import org.codehaus.jettison.json.JSONArray; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; /** * Utility class for graph operations. @@ -402,7 +417,7 @@ public final class GraphHelper { * Gets the value of a property that is stored in the graph as a single property value. If * a multi-property such as {@link Constants#TRAIT_NAMES_PROPERTY_KEY} or {@link Constants#SUPER_TYPES_PROPERTY_KEY} * is used, an exception will be thrown. - * + * * @param element * @param propertyName * @param clazz @@ -414,7 +429,7 @@ public final class GraphHelper { if (LOG.isDebugEnabled()) { LOG.debug("Reading property {} from {}", actualPropertyName, string(element)); } - + return element.getProperty(actualPropertyName, clazz); } @@ -442,8 +457,8 @@ public final class GraphHelper { } return edge.getProperty(actualPropertyName, Object.class); - } - + } + private static <T extends AtlasElement> String string(T element) { if (element instanceof AtlasVertex) { return string((AtlasVertex) element); @@ -452,10 +467,10 @@ public final class GraphHelper { } return element.toString(); } - + /** * Adds an additional value to a multi-property. - * + * * @param vertex * @param propertyName * @param value @@ -516,6 +531,60 @@ public final class GraphHelper { return findVertex(Constants.GUID_PROPERTY_KEY, guid); } + + /** + * Finds the Vertices that correspond to the given property values. Property + * values that are not found in the graph will not be in the map. + * + * @return propertyValue to AtlasVertex map with the result. + */ + private Map<String, AtlasVertex> getVerticesForPropertyValues(String property, List<String> values) + throws RepositoryException { + + if(values.isEmpty()) { + return Collections.emptyMap(); + } + Collection<String> nonNullValues = new HashSet<>(values.size()); + + for(String value : values) { + if(value != null) { + nonNullValues.add(value); + } + } + + //create graph query that finds vertices with the guids + AtlasGraphQuery query = graph.query(); + query.in(property, nonNullValues); + Iterable<AtlasVertex> results = query.vertices(); + + Map<String, AtlasVertex> result = new HashMap<>(values.size()); + //Process the result, using the guidToIndexMap to figure out where + //each vertex should go in the result list. + for(AtlasVertex vertex : results) { + if(vertex.exists()) { + String propertyValue = vertex.getProperty(property, String.class); + if(LOG.isDebugEnabled()) { + LOG.debug("Found a vertex {} with {} = {}", string(vertex), property, propertyValue); + } + result.put(propertyValue, vertex); + } + } + return result; + } + + + /** + * Finds the Vertices that correspond to the given GUIDs. GUIDs + * that are not found in the graph will not be in the map. + * + * @return GUID to AtlasVertex map with the result. + */ + public Map<String, AtlasVertex> getVerticesForGUIDs(List<String> guids) + throws RepositoryException { + + return getVerticesForPropertyValues(Constants.GUID_PROPERTY_KEY, guids); + } + public static String getQualifiedNameForMapKey(String prefix, String key) { return prefix + "." + key; } @@ -638,6 +707,112 @@ public final class GraphHelper { } /** + * Finds vertices that match at least one unique attribute of the instances specified. The AtlasVertex at a given index in the result corresponds + * to the IReferencableInstance at that same index that was passed in. The number of elements in the resultant list is guaranteed to match the + * number of instances that were passed in. If no vertex is found for a given instance, that entry will be null in the resultant list. + * + * + * @param classType + * @param instancesForClass + * @return + * @throws AtlasException + */ + public List<AtlasVertex> getVerticesForInstancesByUniqueAttribute(ClassType classType, List<? extends IReferenceableInstance> instancesForClass) throws AtlasException { + + //For each attribute, need to figure out what values to search for and which instance(s) + //those values correspond to. + Map<String, AttributeValueMap> map = new HashMap<String, AttributeValueMap>(); + + for (AttributeInfo attributeInfo : classType.fieldMapping().fields.values()) { + if (attributeInfo.isUnique) { + String propertyKey = getQualifiedFieldName(classType, attributeInfo.name); + AttributeValueMap mapForAttribute = new AttributeValueMap(); + for(int idx = 0; idx < instancesForClass.size(); idx++) { + IReferenceableInstance instance = instancesForClass.get(idx); + Object value = instance.get(attributeInfo.name); + mapForAttribute.put(value, instance, idx); + } + map.put(propertyKey, mapForAttribute); + } + } + + AtlasVertex[] result = new AtlasVertex[instancesForClass.size()]; + if(map.isEmpty()) { + //no unique attributes + return Arrays.asList(result); + } + + //construct gremlin query + AtlasGraphQuery query = graph.query(); + + query.has(Constants.ENTITY_TYPE_PROPERTY_KEY, classType.getName()); + query.has(Constants.STATE_PROPERTY_KEY,Id.EntityState.ACTIVE.name()); + + List<AtlasGraphQuery> orChildren = new ArrayList<AtlasGraphQuery>(); + + + //build up an or expression to find vertices which match at least one of the unique attribute constraints + //For each unique attributes, we add a within clause to match vertices that have a value of that attribute + //that matches the value in some instance. + for(Map.Entry<String, AttributeValueMap> entry : map.entrySet()) { + AtlasGraphQuery orChild = query.createChildQuery(); + String propertyName = entry.getKey(); + AttributeValueMap valueMap = entry.getValue(); + Set<Object> values = valueMap.getAttributeValues(); + if(values.size() == 1) { + orChild.has(propertyName, values.iterator().next()); + } + else if(values.size() > 1) { + orChild.in(propertyName, values); + } + orChildren.add(orChild); + } + + if(orChildren.size() == 1) { + AtlasGraphQuery child = orChildren.get(0); + query.addConditionsFrom(child); + } + else if(orChildren.size() > 1) { + query.or(orChildren); + } + + Iterable<AtlasVertex> queryResult = query.vertices(); + + + for(AtlasVertex matchingVertex : queryResult) { + Collection<IndexedInstance> matches = getInstancesForVertex(map, matchingVertex); + for(IndexedInstance wrapper : matches) { + result[wrapper.getIndex()]= matchingVertex; + } + } + return Arrays.asList(result); + } + + //finds the instance(s) that correspond to the given vertex + private Collection<IndexedInstance> getInstancesForVertex(Map<String, AttributeValueMap> map, AtlasVertex foundVertex) { + + //loop through the unique attributes. For each attribute, check to see if the vertex property that + //corresponds to that attribute has a value from one or more of the instances that were passed in. + + for(Map.Entry<String, AttributeValueMap> entry : map.entrySet()) { + + String propertyName = entry.getKey(); + AttributeValueMap valueMap = entry.getValue(); + + Object vertexValue = foundVertex.getProperty(propertyName, Object.class); + + Collection<IndexedInstance> instances = valueMap.get(vertexValue); + if(instances != null && instances.size() > 0) { + //return first match. Let the underling graph determine if this is a problem + //(i.e. if the other unique attributes change be changed safely to match what + //the user requested). + return instances; + } + //try another attribute + } + return Collections.emptyList(); + } + /** * Guid and AtlasVertex combo */ public static class VertexInfo { @@ -779,9 +954,9 @@ public final class GraphHelper { public static ITypedReferenceableInstance getTypedReferenceableInstance(TypeSystem typeSystem, Referenceable entityInstance) throws AtlasException { final String entityTypeName = ParamChecker.notEmpty(entityInstance.getTypeName(), "Entity type cannot be null"); - + ClassType entityType = typeSystem.getDataType(ClassType.class, entityTypeName); - + //Both assigned id and values are required for full update //classtype.convert() will remove values if id is assigned. So, set temp id, convert and // then replace with original id @@ -839,7 +1014,7 @@ public final class GraphHelper { } } - + public static void dumpToLog(final AtlasGraph<?,?> graph) { LOG.debug("*******************Graph Dump****************************"); LOG.debug("Vertices of {}", graph); @@ -951,7 +1126,7 @@ public final class GraphHelper { } return null; } - + public static boolean elementExists(AtlasElement v) { return v != null && v.exists(); } @@ -972,12 +1147,12 @@ public final class GraphHelper { public static void setListProperty(AtlasVertex instanceVertex, String propertyName, ArrayList<String> value) throws AtlasException { String actualPropertyName = GraphHelper.encodePropertyKey(propertyName); - instanceVertex.setListProperty(actualPropertyName, value); + instanceVertex.setListProperty(actualPropertyName, value); } - + public static List<String> getListProperty(AtlasVertex instanceVertex, String propertyName) { String actualPropertyName = GraphHelper.encodePropertyKey(propertyName); - return instanceVertex.getListProperty(actualPropertyName); + return instanceVertex.getListProperty(actualPropertyName); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/89f70609/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java index 4e55bbc..bae8b2a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java @@ -17,7 +17,19 @@ */ package org.apache.atlas.repository.graph; -import com.google.inject.Inject; +import static org.apache.atlas.repository.graph.GraphHelper.string; + +import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContext; import org.apache.atlas.repository.Constants; @@ -47,18 +59,9 @@ import org.apache.atlas.utils.MD5Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.security.MessageDigest; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.apache.atlas.repository.graph.GraphHelper.string; +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.google.inject.Inject; public final class TypedInstanceToGraphMapper { @@ -86,40 +89,42 @@ public final class TypedInstanceToGraphMapper { void mapTypedInstanceToGraph(Operation operation, ITypedReferenceableInstance... typedInstances) throws AtlasException { + RequestContext requestContext = RequestContext.get(); + Collection<IReferenceableInstance> allNewInstances = new ArrayList<>(); for (ITypedReferenceableInstance typedInstance : typedInstances) { - if (LOG.isDebugEnabled()) { - LOG.debug("Adding/updating entity {}", typedInstance); - } + allNewInstances.addAll(walkClassInstances(typedInstance)); + } - Collection<IReferenceableInstance> newInstances = walkClassInstances(typedInstance); - TypeUtils.Pair<List<ITypedReferenceableInstance>, List<ITypedReferenceableInstance>> instancesPair = - createVerticesAndDiscoverInstances(newInstances); - List<ITypedReferenceableInstance> entitiesToCreate = instancesPair.left; - List<ITypedReferenceableInstance> entitiesToUpdate = instancesPair.right; - FullTextMapper fulltextMapper = new FullTextMapper(graphToTypedInstanceMapper); - switch (operation) { - case CREATE: - List<String> ids = addOrUpdateAttributesAndTraits(operation, entitiesToCreate); - addFullTextProperty(entitiesToCreate, fulltextMapper); - requestContext.recordEntityCreate(ids); - break; + TypeUtils.Pair<List<ITypedReferenceableInstance>, List<ITypedReferenceableInstance>> instancesPair = + createVerticesAndDiscoverInstances(allNewInstances); - case UPDATE_FULL: - case UPDATE_PARTIAL: - ids = addOrUpdateAttributesAndTraits(Operation.CREATE, entitiesToCreate); - requestContext.recordEntityCreate(ids); - ids = addOrUpdateAttributesAndTraits(operation, entitiesToUpdate); - requestContext.recordEntityUpdate(ids); + List<ITypedReferenceableInstance> entitiesToCreate = instancesPair.left; + List<ITypedReferenceableInstance> entitiesToUpdate = instancesPair.right; - addFullTextProperty(entitiesToCreate, fulltextMapper); - addFullTextProperty(entitiesToUpdate, fulltextMapper); - break; + FullTextMapper fulltextMapper = new FullTextMapper(this, graphToTypedInstanceMapper); + switch (operation) { + case CREATE: + List<String> ids = addOrUpdateAttributesAndTraits(operation, entitiesToCreate); + addFullTextProperty(entitiesToCreate, fulltextMapper); + requestContext.recordEntityCreate(ids); + break; - default: - throw new UnsupportedOperationException("Not handled - " + operation); - } + case UPDATE_FULL: + case UPDATE_PARTIAL: + ids = addOrUpdateAttributesAndTraits(Operation.CREATE, entitiesToCreate); + requestContext.recordEntityCreate(ids); + ids = addOrUpdateAttributesAndTraits(operation, entitiesToUpdate); + requestContext.recordEntityUpdate(ids); + + addFullTextProperty(entitiesToCreate, fulltextMapper); + addFullTextProperty(entitiesToUpdate, fulltextMapper); + break; + + default: + throw new UnsupportedOperationException("Not handled - " + operation); } + } private Collection<IReferenceableInstance> walkClassInstances(ITypedReferenceableInstance typedInstance) @@ -257,68 +262,112 @@ public final class TypedInstanceToGraphMapper { List<ITypedReferenceableInstance> instancesToCreate = new ArrayList<>(); List<ITypedReferenceableInstance> instancesToUpdate = new ArrayList<>(); - for (IReferenceableInstance instance : instances) { - if (LOG.isDebugEnabled()) { - LOG.debug("Discovering instance to create/update for {}", instance.toShortString()); - } + Map<Id,AtlasVertex> foundVertices = findExistingVertices(instances); + //cache all the ids + idToVertexMap.putAll(foundVertices); - ITypedReferenceableInstance newInstance; + Set<Id> processedIds = new HashSet<>(); + for(IReferenceableInstance instance : instances) { Id id = instance.getId(); + if(processedIds.contains(id)) { + continue; + } - if (!idToVertexMap.containsKey(id)) { - AtlasVertex instanceVertex; - if (id.isAssigned()) { // has a GUID - if (LOG.isDebugEnabled()) { - LOG.debug("Instance has an assigned id {}", instance.getId()._getId()); - } + AtlasVertex instanceVertex = foundVertices.get(id); + ClassType classType = typeSystem.getDataType(ClassType.class, instance.getTypeName()); - instanceVertex = graphHelper.getVertexForGUID(id.id); - if (!(instance instanceof ReferenceableInstance)) { - throw new IllegalStateException( - String.format("%s is not of type ITypedReferenceableInstance", instance.toShortString())); - } - newInstance = (ITypedReferenceableInstance) instance; - instancesToUpdate.add(newInstance); - - } else { - //Check if there is already an instance with the same unique attribute value - ClassType classType = typeSystem.getDataType(ClassType.class, instance.getTypeName()); - instanceVertex = graphHelper.getVertexForInstanceByUniqueAttribute(classType, instance); - - //no entity with the given unique attribute, create new - if (instanceVertex == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Creating new vertex for instance {}", instance.toShortString()); - } + if(instanceVertex == null) { - newInstance = classType.convert(instance, Multiplicity.REQUIRED); - instanceVertex = graphHelper.createVertexWithIdentity(newInstance, classType.getAllSuperTypeNames()); - instancesToCreate.add(newInstance); + if(LOG.isDebugEnabled()) { + LOG.debug("Creating new vertex for instance {}", instance.toShortString()); + } - //Map only unique attributes for cases of circular references - mapInstanceToVertex(newInstance, instanceVertex, classType.fieldMapping().fields, true, Operation.CREATE); + ITypedReferenceableInstance newInstance = classType.convert(instance, Multiplicity.REQUIRED); + instanceVertex = graphHelper.createVertexWithIdentity(newInstance, classType.getAllSuperTypeNames()); + instancesToCreate.add(newInstance); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Re-using existing vertex {} for instance {}", string(instanceVertex), instance.toShortString()); - } + //Map only unique attributes for cases of circular references + mapInstanceToVertex(newInstance, instanceVertex, classType.fieldMapping().fields, true, Operation.CREATE); + idToVertexMap.put(id, instanceVertex); - if (!(instance instanceof ReferenceableInstance)) { - throw new IllegalStateException( - String.format("%s is not of type ITypedReferenceableInstance", instance.toShortString())); - } - newInstance = (ITypedReferenceableInstance) instance; - instancesToUpdate.add(newInstance); - } + } + else { + + if(LOG.isDebugEnabled()) { + LOG.debug("Re-using existing vertex {} for instance {}", string(instanceVertex), instance.toShortString()); } - //Set the id in the new instance - idToVertexMap.put(id, instanceVertex); + if (!(instance instanceof ITypedReferenceableInstance)) { + throw new IllegalStateException( + String.format("%s is not of type ITypedReferenceableInstance", instance.toShortString())); + } + ITypedReferenceableInstance existingInstance = (ITypedReferenceableInstance) instance; + instancesToUpdate.add(existingInstance); } + processedIds.add(id); + } return TypeUtils.Pair.of(instancesToCreate, instancesToUpdate); } + private Map<Id,AtlasVertex> findExistingVertices(Collection<IReferenceableInstance> instances) throws AtlasException { + + VertexLookupContext context = new VertexLookupContext(this); + Map<Id,AtlasVertex> result = new HashMap<>(); + + for(IReferenceableInstance instance : instances) { + context.addInstance(instance); + } + + List<Id> instancesToLoad = new ArrayList<>(context.getInstancesToLoadByGuid()); + List<String> guidsToLoad = Lists.transform(instancesToLoad, new Function<Id,String>() { + + @Override + public String apply(Id instance) { + Id id = getExistingId(instance); + return id.id; + } + + }); + + Map<String, AtlasVertex> instanceVertices = graphHelper.getVerticesForGUIDs(guidsToLoad); + + List<String> missingGuids = new ArrayList<>(); + for(int i = 0 ; i < instancesToLoad.size(); i++) { + + String guid = guidsToLoad.get(i); + AtlasVertex instanceVertex = instanceVertices.get(guid); + if(instanceVertex == null) { + missingGuids.add(guid); + continue; + } + + Id instance = instancesToLoad.get(i); + if(LOG.isDebugEnabled()) { + LOG.debug("Found vertex {} for instance {}", string(instanceVertex), instance); + } + result.put(instance, instanceVertex); + } + + if(missingGuids.size() > 0) { + throw new EntityNotFoundException("Could not find entities in the repository with the following GUIDs: " + missingGuids); + } + + for(Map.Entry<ClassType,List<IReferenceableInstance>> entry : context.getInstancesToLoadByUniqueAttribute().entrySet()) { + ClassType type = entry.getKey(); + List<IReferenceableInstance> instancesForClass = entry.getValue(); + List<AtlasVertex> correspondingVertices = graphHelper.getVerticesForInstancesByUniqueAttribute(type, instancesForClass); + for(int i = 0; i < instancesForClass.size(); i++) { + IReferenceableInstance inst = instancesForClass.get(i); + AtlasVertex vertex = correspondingVertices.get(i); + result.put(getExistingId(inst), vertex); + } + } + + return result; + } + + private void addFullTextProperty(List<ITypedReferenceableInstance> instances, FullTextMapper fulltextMapper) throws AtlasException { for (ITypedReferenceableInstance typedInstance : instances) { // Traverse AtlasVertex instanceVertex = getClassVertex(typedInstance); @@ -356,9 +405,9 @@ public final class TypedInstanceToGraphMapper { IDataType elementType = ((DataTypes.ArrayType) attributeInfo.dataType()).getElemType(); String propertyName = GraphHelper.getQualifiedFieldName(typedInstance, attributeInfo); - + List<Object> currentElements = GraphHelper.getArrayElementsProperty(elementType, instanceVertex, propertyName); - + List<Object> newElementsCreated = new ArrayList<>(); if (!newAttributeEmpty) { @@ -582,6 +631,7 @@ public final class TypedInstanceToGraphMapper { // add a new vertex for the struct or trait instance AtlasVertex structInstanceVertex = graphHelper.createVertexWithoutIdentity(structInstance.getTypeName(), null, Collections.<String>emptySet()); // no super types for struct type + if (LOG.isDebugEnabled()) { LOG.debug("created vertex {} for struct {} value {}", string(structInstanceVertex), attributeInfo.name, structInstance.toShortString()); @@ -649,21 +699,24 @@ public final class TypedInstanceToGraphMapper { return graphHelper.getOrCreateEdge(instanceVertex, toVertex, edgeLabel); } - private AtlasVertex getClassVertex(ITypedReferenceableInstance typedReference) throws EntityNotFoundException { - AtlasVertex referenceVertex = null; + private <V,E> AtlasVertex<V,E> getClassVertex(ITypedReferenceableInstance typedReference) throws EntityNotFoundException { + AtlasVertex<V,E> referenceVertex = null; Id id = null; if (typedReference != null) { - id = typedReference instanceof Id ? (Id) typedReference : typedReference.getId(); - if (id.isAssigned()) { + id = getExistingId(typedReference); + referenceVertex = idToVertexMap.get(id); + if(referenceVertex == null && id.isAssigned()) { referenceVertex = graphHelper.getVertexForGUID(id.id); - } else { - referenceVertex = idToVertexMap.get(id); } } return referenceVertex; } + Id getExistingId(IReferenceableInstance instance) { + return instance instanceof Id ? (Id) instance : instance.getId(); + } + private Id getId(ITypedReferenceableInstance typedReference) throws EntityNotFoundException { if (typedReference == null) { throw new IllegalArgumentException("typedReference must be non-null"); @@ -768,4 +821,8 @@ public final class TypedInstanceToGraphMapper { GraphHelper.setProperty(instanceVertex, vertexPropertyName, propertyValue); } + + public AtlasVertex lookupVertex(Id refId) { + return idToVertexMap.get(refId); + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/89f70609/repository/src/main/java/org/apache/atlas/repository/graph/VertexLookupContext.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/VertexLookupContext.java b/repository/src/main/java/org/apache/atlas/repository/graph/VertexLookupContext.java new file mode 100644 index 0000000..dd90be6 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/graph/VertexLookupContext.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.graph; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.atlas.AtlasException; +import org.apache.atlas.typesystem.IReferenceableInstance; +import org.apache.atlas.typesystem.ITypedReferenceableInstance; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.typesystem.types.AttributeInfo; +import org.apache.atlas.typesystem.types.ClassType; +import org.apache.atlas.typesystem.types.DataTypes; +import org.apache.atlas.typesystem.types.IDataType; +import org.apache.atlas.typesystem.types.Multiplicity; +import org.apache.atlas.typesystem.types.TypeSystem; +import org.apache.atlas.typesystem.types.DataTypes.TypeCategory; + +/** + * Helper class for TypedInstanceGraphMapper. Determines which instances + * should be loaded by GUID and which ones should be loaded by unique attribute. + * In addition, it sorts the instances that should be loaded by unique + * attribute by class. + * + */ +public class VertexLookupContext { + + private final TypedInstanceToGraphMapper mapper; + + private static final TypeSystem typeSystem = TypeSystem.getInstance(); + + private Map<ClassType,List<IReferenceableInstance>> instancesWithoutGuids = new HashMap<>(); + private Set<Id> guidsToLookup = new HashSet<>(); + + + /** + * @param typedInstanceToGraphMapper + */ + VertexLookupContext(TypedInstanceToGraphMapper typedInstanceToGraphMapper) { + mapper = typedInstanceToGraphMapper; + } + + /** + * Adds an instance to be loaded. + * + */ + public void addInstance(IReferenceableInstance instance) throws AtlasException { + + ClassType classType = typeSystem.getDataType(ClassType.class, instance.getTypeName()); + ITypedReferenceableInstance newInstance = classType.convert(instance, Multiplicity.REQUIRED); + findReferencedInstancesToPreLoad(newInstance); + Id id = instance.getId(); + if(mapper.lookupVertex(id) == null) { + if(id.isAssigned()) { + guidsToLookup.add(id); + } + else { + addToClassMap(classType, instance); + } + } + } + + /** + * Returns the instances that should be loaded by unique attribute, sorted by + * class. + * + */ + public Map<ClassType,List<IReferenceableInstance>> getInstancesToLoadByUniqueAttribute() { + return instancesWithoutGuids; + } + + /** + * Returns the Ids of the instance that should be loaded by GUID + * + * @return + */ + public Set<Id> getInstancesToLoadByGuid() { + return guidsToLookup; + } + + private void addToClassMap(ClassType classType, IReferenceableInstance instance) throws AtlasException { + + List<IReferenceableInstance> toUpdate = instancesWithoutGuids.get(classType); + if(toUpdate == null) { + toUpdate = new ArrayList<>(); + instancesWithoutGuids.put(classType, toUpdate); + } + toUpdate.add(instance); + } + + private void findReferencedInstancesToPreLoad(ITypedReferenceableInstance newInstance) throws AtlasException { + //pre-load vertices for reference fields + for(AttributeInfo info : newInstance.fieldMapping().fields.values()) { + + if(info.dataType().getTypeCategory() == TypeCategory.CLASS) { + ITypedReferenceableInstance newAttributeValue = (ITypedReferenceableInstance)newInstance.get(info.name); + addAdditionalInstance(newAttributeValue); + } + + if(info.dataType().getTypeCategory() == TypeCategory.ARRAY) { + IDataType elementType = ((DataTypes.ArrayType) info.dataType()).getElemType(); + if(elementType.getTypeCategory() == TypeCategory.CLASS) { + List<ITypedReferenceableInstance> newElements = (List) newInstance.get(info.name); + addAdditionalInstances(newElements); + } + } + + if(info.dataType().getTypeCategory() == TypeCategory.MAP) { + IDataType elementType = ((DataTypes.MapType) info.dataType()).getValueType(); + if(elementType.getTypeCategory() == TypeCategory.CLASS) { + Map<Object, ITypedReferenceableInstance> newAttribute = + (Map<Object, ITypedReferenceableInstance>) newInstance.get(info.name); + + if(newAttribute != null) { + addAdditionalInstances(newAttribute.values()); + } + } + } + } + } + + private void addAdditionalInstance(ITypedReferenceableInstance instance) { + + if(instance == null) { + return; + } + + Id id = mapper.getExistingId(instance); + if(! id.isAssigned()) { + return; + } + guidsToLookup.add(id); + } + + + + private void addAdditionalInstances(Collection<ITypedReferenceableInstance> newElements) { + if(newElements != null) { + for(ITypedReferenceableInstance instance: newElements) { + addAdditionalInstance(instance); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/89f70609/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java index 35a489f..b14531f 100755 --- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java +++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java @@ -367,6 +367,10 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang private void validateUniqueAttribute(String entityType, String attributeName) throws AtlasException { ClassType type = typeSystem.getDataType(ClassType.class, entityType); AttributeInfo attribute = type.fieldMapping().fields.get(attributeName); + if(attribute == null) { + throw new IllegalArgumentException( + String.format("%s is not an attribute in %s", attributeName, entityType)); + } if (!attribute.isUnique) { throw new IllegalArgumentException( String.format("%s.%s is not a unique attribute", entityType, attributeName)); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/89f70609/repository/src/main/java/org/apache/atlas/util/AttributeValueMap.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AttributeValueMap.java b/repository/src/main/java/org/apache/atlas/util/AttributeValueMap.java new file mode 100644 index 0000000..6e22604 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/util/AttributeValueMap.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.util; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.typesystem.IReferenceableInstance; + +/** + * Map of attribute values to a collection of IndexedInstances with that attribute value. + * + * @see GraphHelper#getVerticesForInstancesByUniqueAttributes + * + */ +public class AttributeValueMap { + + //need collection in case they are adding the same entity twice? + private Map<Object,Collection<IndexedInstance>> valueMap_ = new HashMap<>(); + + public void put(Object value, IReferenceableInstance instance, int index) { + IndexedInstance wrapper = new IndexedInstance(instance, index); + Collection<IndexedInstance> existingValues = valueMap_.get(value); + if(existingValues == null) { + //only expect 1 value + existingValues = new HashSet<>(1); + valueMap_.put(value, existingValues); + } + existingValues.add(wrapper); + } + + public Collection<IndexedInstance> get(Object value) { + return valueMap_.get(value); + } + + + public Set<Object> getAttributeValues() { + return valueMap_.keySet(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/89f70609/repository/src/main/java/org/apache/atlas/util/IndexedInstance.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/IndexedInstance.java b/repository/src/main/java/org/apache/atlas/util/IndexedInstance.java new file mode 100644 index 0000000..60ec8cc --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/util/IndexedInstance.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.util; + +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.typesystem.IReferenceableInstance; + +/** + * Data structure that stores an IReferenceableInstance and its location within + * a list. + * + * @see GraphHelper#getVerticesForInstancesByUniqueAttributes + */ +public class IndexedInstance { + + private final IReferenceableInstance instance_; + private final int index_; + + public IndexedInstance(IReferenceableInstance instance, int index) { + super(); + this.instance_ = instance; + this.index_ = index; + } + + public IReferenceableInstance getInstance() { + return instance_; + } + + public int getIndex() { + return index_; + } + + @Override + public int hashCode() { + return instance_.hashCode(); + } + + @Override + public boolean equals(Object other) { + if(!(other instanceof IndexedInstance)) { + return false; + } + IndexedInstance otherInstance = (IndexedInstance)other; + return instance_.equals(otherInstance.getInstance()); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/89f70609/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperTest.java index a7dc13d..f5a6a05 100644 --- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperTest.java @@ -18,15 +18,42 @@ package org.apache.atlas.repository.graph; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.inject.Inject; + +import org.apache.atlas.AtlasException; import org.apache.atlas.RepositoryMetadataModule; import org.apache.atlas.TestUtils; import org.apache.atlas.repository.graph.GraphHelper.VertexInfo; import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.services.MetadataService; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.ITypedReferenceableInstance; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.TypesDef; +import org.apache.atlas.typesystem.exception.TypeNotFoundException; +import org.apache.atlas.typesystem.json.InstanceSerialization; +import org.apache.atlas.typesystem.json.TypesSerialization; +import org.apache.atlas.typesystem.types.ClassType; +import org.apache.atlas.typesystem.types.Multiplicity; import org.apache.atlas.typesystem.types.TypeSystem; +import org.codehaus.jettison.json.JSONArray; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -34,20 +61,6 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Guice; import org.testng.annotations.Test; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -import javax.inject.Inject; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; - @Guice(modules = RepositoryMetadataModule.class) public class GraphHelperTest { @@ -69,6 +82,9 @@ public class GraphHelperTest { } @Inject + private MetadataService metadataService; + + @Inject private GraphBackedMetadataRepository repositoryService; private TypeSystem typeSystem; @@ -82,7 +98,12 @@ public class GraphHelperTest { typeSystem.reset(); new GraphBackedSearchIndexer(typeRegistry); - + TypesDef typesDef = TestUtils.defineHiveTypes(); + try { + metadataService.getTypeDefinition(TestUtils.TABLE_TYPE); + } catch (TypeNotFoundException e) { + metadataService.createType(TypesSerialization.toJson(typesDef)); + } TestUtils.defineDeptEmployeeTypes(typeSystem); } @@ -92,6 +113,43 @@ public class GraphHelperTest { } @Test + public void testGetInstancesByUniqueAttributes() throws Exception { + + GraphHelper helper = GraphHelper.getInstance(); + List<ITypedReferenceableInstance> instances = new ArrayList<>(); + List<String> guids = new ArrayList<>(); + TypeSystem ts = TypeSystem.getInstance(); + ClassType dbType = ts.getDataType(ClassType.class, TestUtils.DATABASE_TYPE); + + for(int i = 0; i < 10; i++) { + Referenceable db = TestUtils.createDBEntity(); + String guid = createInstance(db); + ITypedReferenceableInstance instance = convert(db, dbType); + instances.add(instance); + guids.add(guid); + } + + //lookup vertices via getVertexForInstanceByUniqueAttributes + List<AtlasVertex> vertices = helper.getVerticesForInstancesByUniqueAttribute(dbType, instances); + assertEquals(instances.size(), vertices.size()); + //assert vertex matches the vertex we get through getVertexForGUID + for(int i = 0; i < instances.size(); i++) { + String guid = guids.get(i); + AtlasVertex foundVertex = vertices.get(i); + AtlasVertex expectedVertex = helper.getVertexForGUID(guid); + assertEquals(foundVertex, expectedVertex); + } + } + @Test + public void testGetVerticesForGUIDSWithDuplicates() throws Exception { + ITypedReferenceableInstance hrDept = TestUtils.createDeptEg1(TypeSystem.getInstance()); + List<String> result = repositoryService.createEntities(hrDept); + String guid = result.get(0); + Map<String, AtlasVertex> verticesForGUIDs = GraphHelper.getInstance().getVerticesForGUIDs(Arrays.asList(guid, guid)); + Assert.assertEquals(verticesForGUIDs.size(), 1); + Assert.assertTrue(verticesForGUIDs.containsKey(guid)); + } + @Test public void testGetCompositeGuidsAndVertices() throws Exception { ITypedReferenceableInstance hrDept = TestUtils.createDeptEg1(typeSystem); List<String> createdGuids = repositoryService.createEntities(hrDept); @@ -144,4 +202,22 @@ public class GraphHelperTest { assertFalse(iterator.hasNext()); assertFalse(iterator.hasNext()); } + + private ITypedReferenceableInstance convert(Referenceable instance, ClassType type) throws AtlasException { + + return type.convert(instance, Multiplicity.REQUIRED); + } + + private String createInstance(Referenceable entity) throws Exception { + TestUtils.resetRequestContext(); + + String entityjson = InstanceSerialization.toJson(entity, true); + JSONArray entitiesJson = new JSONArray(); + entitiesJson.put(entityjson); + List<String> guids = metadataService.createEntities(entitiesJson.toString()); + if (guids != null && guids.size() > 0) { + return guids.get(guids.size() - 1); + } + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/89f70609/typesystem/src/test/resources/atlas-application.properties ---------------------------------------------------------------------- diff --git a/typesystem/src/test/resources/atlas-application.properties b/typesystem/src/test/resources/atlas-application.properties index 108630b..0e6bc41 100644 --- a/typesystem/src/test/resources/atlas-application.properties +++ b/typesystem/src/test/resources/atlas-application.properties @@ -77,6 +77,7 @@ atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address} ######### Hive Lineage Configs ######### ## Schema atlas.lineage.schema.query.hive_table=hive_table where __guid='%s'\, columns +atlas.lineage.schema.query.hive_table_v1=hive_table_v1 where __guid='%s'\, columns ######### Notification Configs ######### atlas.notification.embedded=true http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/89f70609/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java index 17c8237..9ca684d 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java @@ -144,7 +144,7 @@ public class EntityResource { throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.CONFLICT)); } catch (ValueConversionException ve) { LOG.error("Unable to persist entity instance due to a deserialization error entityDef={}", entityJson, ve); - throw new WebApplicationException(Servlets.getErrorResponse(ve.getCause(), Response.Status.BAD_REQUEST)); + throw new WebApplicationException(Servlets.getErrorResponse(ve.getCause() != null ? ve.getCause() : ve, Response.Status.BAD_REQUEST)); } catch (AtlasException | IllegalArgumentException e) { LOG.error("Unable to persist entity instance entityDef={}", entityJson, e); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/89f70609/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java index 1774611..29be942 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java +++ b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java @@ -51,8 +51,6 @@ import static org.testng.Assert.assertTrue; @Guice(modules = NotificationModule.class) public class EntityNotificationIT extends BaseResourceIT { - private static final String ENTITIES = "api/atlas/entities"; - private static final String TRAITS = "traits"; private final String DATABASE_NAME = "db" + randomString(); private final String TABLE_NAME = "table" + randomString(); @Inject @@ -66,7 +64,7 @@ public class EntityNotificationIT extends BaseResourceIT { public void setUp() throws Exception { super.setUp(); createTypeDefinitionsV1(); - Referenceable HiveDBInstance = createHiveDBInstanceV1(DATABASE_NAME); + Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(DATABASE_NAME); dbId = createInstance(HiveDBInstance); List<NotificationConsumer<EntityNotification>> consumers = @@ -77,13 +75,13 @@ public class EntityNotificationIT extends BaseResourceIT { @Test public void testCreateEntity() throws Exception { - Referenceable tableInstance = createHiveTableInstanceV1(DATABASE_NAME, TABLE_NAME, dbId); + Referenceable tableInstance = createHiveTableInstanceBuiltIn(DATABASE_NAME, TABLE_NAME, dbId); tableId = createInstance(tableInstance); final String guid = tableId._getId(); waitForNotification(notificationConsumer, MAX_WAIT_TIME, - newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE, guid)); + newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid)); } @Test(dependsOnMethods = "testCreateEntity") @@ -96,29 +94,29 @@ public class EntityNotificationIT extends BaseResourceIT { atlasClientV1.updateEntityAttribute(guid, property, newValue); waitForNotification(notificationConsumer, MAX_WAIT_TIME, - newNotificationPredicate(EntityNotification.OperationType.ENTITY_UPDATE, HIVE_TABLE_TYPE, guid)); + newNotificationPredicate(EntityNotification.OperationType.ENTITY_UPDATE, HIVE_TABLE_TYPE_BUILTIN, guid)); } @Test public void testDeleteEntity() throws Exception { final String tableName = "table-" + randomString(); final String dbName = "db-" + randomString(); - Referenceable HiveDBInstance = createHiveDBInstanceV1(dbName); + Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(dbName); Id dbId = createInstance(HiveDBInstance); - Referenceable tableInstance = createHiveTableInstanceV1(dbName, tableName, dbId); + Referenceable tableInstance = createHiveTableInstanceBuiltIn(dbName, tableName, dbId); final Id tableId = createInstance(tableInstance); final String guid = tableId._getId(); waitForNotification(notificationConsumer, MAX_WAIT_TIME, - newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE, guid)); + newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid)); final String name = (String) tableInstance.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME); - atlasClientV1.deleteEntity(HIVE_TABLE_TYPE, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); + atlasClientV1.deleteEntity(HIVE_TABLE_TYPE_BUILTIN, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); waitForNotification(notificationConsumer, MAX_WAIT_TIME, - newNotificationPredicate(EntityNotification.OperationType.ENTITY_DELETE, HIVE_TABLE_TYPE, guid)); + newNotificationPredicate(EntityNotification.OperationType.ENTITY_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid)); } @Test(dependsOnMethods = "testCreateEntity") @@ -141,7 +139,7 @@ public class EntityNotificationIT extends BaseResourceIT { atlasClientV1.addTrait(guid, traitInstance); EntityNotification entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, - newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE, guid)); + newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid)); IReferenceableInstance entity = entityNotification.getEntity(); assertTrue(entity.getTraits().contains(traitName)); @@ -166,7 +164,7 @@ public class EntityNotificationIT extends BaseResourceIT { atlasClientV1.addTrait(guid, traitInstance); entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, - newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE, guid)); + newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid)); allTraits = entityNotification.getAllTraits(); allTraitNames = new LinkedList<>(); @@ -187,7 +185,7 @@ public class EntityNotificationIT extends BaseResourceIT { atlasClientV1.deleteTrait(guid, traitName); EntityNotification entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, - newNotificationPredicate(EntityNotification.OperationType.TRAIT_DELETE, HIVE_TABLE_TYPE, guid)); + newNotificationPredicate(EntityNotification.OperationType.TRAIT_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid)); assertFalse(entityNotification.getEntity().getTraits().contains(traitName)); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/89f70609/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java index 4a3db88..1c2cdc6 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java @@ -68,7 +68,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT { new Referenceable(randomString()))); //send valid message - final Referenceable entity = new Referenceable(DATABASE_TYPE); + final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); String dbName = "db" + randomString(); entity.set(NAME, dbName); entity.set(DESCRIPTION, randomString()); @@ -79,7 +79,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT { waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { - JSONArray results = searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, entity.get(NAME))); + JSONArray results = searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE_BUILTIN, entity.get(NAME))); return results.length() == 1; } }); @@ -87,7 +87,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT { @Test public void testCreateEntity() throws Exception { - final Referenceable entity = new Referenceable(DATABASE_TYPE); + final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); String dbName = "db" + randomString(); entity.set(NAME, dbName); entity.set(DESCRIPTION, randomString()); @@ -99,13 +99,13 @@ public class NotificationHookConsumerIT extends BaseResourceIT { waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { - JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE, entity.get(QUALIFIED_NAME))); + JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, entity.get(QUALIFIED_NAME))); return results.length() == 1; } }); //Assert that user passed in hook message is used in audit - Referenceable instance = atlasClientV1.getEntity(DATABASE_TYPE, QUALIFIED_NAME, (String) entity.get(QUALIFIED_NAME)); + Referenceable instance = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, (String) entity.get(QUALIFIED_NAME)); List<EntityAuditEvent> events = atlasClientV1.getEntityAuditEvents(instance.getId()._getId(), (short) 1); assertEquals(events.size(), 1); @@ -114,7 +114,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT { @Test public void testUpdateEntityPartial() throws Exception { - final Referenceable entity = new Referenceable(DATABASE_TYPE); + final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); final String dbName = "db" + randomString(); entity.set(NAME, dbName); entity.set(DESCRIPTION, randomString()); @@ -123,26 +123,26 @@ public class NotificationHookConsumerIT extends BaseResourceIT { atlasClientV1.createEntity(entity); - final Referenceable newEntity = new Referenceable(DATABASE_TYPE); + final Referenceable newEntity = new Referenceable(DATABASE_TYPE_BUILTIN); newEntity.set("owner", randomString()); sendHookMessage( - new HookNotification.EntityPartialUpdateRequest(TEST_USER, DATABASE_TYPE, QUALIFIED_NAME, dbName, newEntity)); + new HookNotification.EntityPartialUpdateRequest(TEST_USER, DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName, newEntity)); waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { - Referenceable localEntity = atlasClientV1.getEntity(DATABASE_TYPE, QUALIFIED_NAME, dbName); + Referenceable localEntity = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName); return (localEntity.get("owner") != null && localEntity.get("owner").equals(newEntity.get("owner"))); } }); //Its partial update and un-set fields are not updated - Referenceable actualEntity = atlasClientV1.getEntity(DATABASE_TYPE, QUALIFIED_NAME, dbName); + Referenceable actualEntity = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName); assertEquals(actualEntity.get(DESCRIPTION), entity.get(DESCRIPTION)); } @Test public void testUpdatePartialUpdatingQualifiedName() throws Exception { - final Referenceable entity = new Referenceable(DATABASE_TYPE); + final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); final String dbName = "db" + randomString(); entity.set(NAME, dbName); entity.set(DESCRIPTION, randomString()); @@ -151,29 +151,29 @@ public class NotificationHookConsumerIT extends BaseResourceIT { atlasClientV1.createEntity(entity); - final Referenceable newEntity = new Referenceable(DATABASE_TYPE); + final Referenceable newEntity = new Referenceable(DATABASE_TYPE_BUILTIN); final String newName = "db" + randomString(); newEntity.set(QUALIFIED_NAME, newName); sendHookMessage( - new HookNotification.EntityPartialUpdateRequest(TEST_USER, DATABASE_TYPE, QUALIFIED_NAME, dbName, newEntity)); + new HookNotification.EntityPartialUpdateRequest(TEST_USER, DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName, newEntity)); waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { - JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE, newName)); + JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, newName)); return results.length() == 1; } }); //no entity with the old qualified name - JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE, dbName)); + JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, dbName)); assertEquals(results.length(), 0); } @Test public void testDeleteByQualifiedName() throws Exception { - Referenceable entity = new Referenceable(DATABASE_TYPE); + Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); final String dbName = "db" + randomString(); entity.set(NAME, dbName); entity.set(DESCRIPTION, randomString()); @@ -183,7 +183,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT { final String dbId = atlasClientV1.createEntity(entity).get(0); sendHookMessage( - new HookNotification.EntityDeleteRequest(TEST_USER, DATABASE_TYPE, QUALIFIED_NAME, dbName)); + new HookNotification.EntityDeleteRequest(TEST_USER, DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName)); waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { @@ -195,7 +195,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT { @Test public void testUpdateEntityFullUpdate() throws Exception { - Referenceable entity = new Referenceable(DATABASE_TYPE); + Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); final String dbName = "db" + randomString(); entity.set(NAME, dbName); entity.set(DESCRIPTION, randomString()); @@ -204,7 +204,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT { atlasClientV1.createEntity(entity); - final Referenceable newEntity = new Referenceable(DATABASE_TYPE); + final Referenceable newEntity = new Referenceable(DATABASE_TYPE_BUILTIN); newEntity.set(NAME, randomString()); newEntity.set(DESCRIPTION, randomString()); newEntity.set("owner", randomString()); @@ -216,12 +216,12 @@ public class NotificationHookConsumerIT extends BaseResourceIT { waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { - JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE, newEntity.get(QUALIFIED_NAME))); + JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, newEntity.get(QUALIFIED_NAME))); return results.length() == 1; } }); - Referenceable actualEntity = atlasClientV1.getEntity(DATABASE_TYPE, QUALIFIED_NAME, dbName); + Referenceable actualEntity = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName); assertEquals(actualEntity.get(DESCRIPTION), newEntity.get(DESCRIPTION)); assertEquals(actualEntity.get("owner"), newEntity.get("owner")); }
