http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java new file mode 100644 index 0000000..b874c5d --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v1; + +import atlas.shaded.hbase.guava.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Provider; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.TypeCategory; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; +import org.apache.atlas.repository.store.graph.EntityGraphDiscovery; +import org.apache.atlas.repository.store.graph.EntityResolver; +import org.apache.atlas.type.AtlasArrayType; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasMapType; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; + +import javax.inject.Inject; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + + +public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery { + + private AtlasTypeRegistry typeRegistry; + + private Set<String> processedIds = new HashSet<>(); + + private EntityGraphDiscoveryContext discoveredEntities = new EntityGraphDiscoveryContext(); + + private final Collection<EntityResolver> entityResolvers = new LinkedHashSet<>(); + + @Inject + public AtlasEntityGraphDiscoveryV1(AtlasTypeRegistry typeRegistry, final Collection<Provider<EntityResolver>> entityResolverProviders) { + this.typeRegistry = typeRegistry; + + for (Provider<EntityResolver> entityResolverProvider : entityResolverProviders) { + entityResolvers.add(entityResolverProvider.get()); + } + } + + @VisibleForTesting + public AtlasEntityGraphDiscoveryV1(AtlasTypeRegistry typeRegistry, final List<EntityResolver> entityResolvers) { + this.typeRegistry = typeRegistry; + + for (EntityResolver entityResolver : entityResolvers) { + this.entityResolvers.add(entityResolver); + } + } + + @Override + public void init() throws AtlasBaseException { + //Nothing to do + } + + @Override + public EntityGraphDiscoveryContext discoverEntities(final List<AtlasEntity> entities) throws AtlasBaseException { + + //walk the graph and discover entity references + discover(entities); + + //resolve root and referred entities + resolveReferences(); + + return discoveredEntities; + } + + @Override + public void cleanUp() throws AtlasBaseException { + processedIds.clear(); + discoveredEntities.cleanUp(); + final Collection<EntityResolver> entityResolvers = this.entityResolvers; + for (EntityResolver resolver : entityResolvers) { + resolver.cleanUp(); + } + } + + + protected void resolveReferences() throws AtlasBaseException { + for (EntityResolver resolver : entityResolvers ) { + resolver.init(discoveredEntities); + resolver.resolveEntityReferences(); + } + } + + + protected void discover(final List<AtlasEntity> entities) throws AtlasBaseException { + for (AtlasEntity entity : entities) { + AtlasType type = typeRegistry.getType(entity.getTypeName()); + + discoveredEntities.addRootEntity(entity); + walkEntityGraph(type, entity); + } + } + + private void visitReference(AtlasEntityType type, Object entity, boolean isManagedEntity) throws AtlasBaseException { + if ( entity != null) { + if ( entity instanceof String ) { + String guid = (String) entity; + discoveredEntities.addUnResolvedIdReference(type, guid); + } else if ( entity instanceof AtlasObjectId ) { + final String guid = ((AtlasObjectId) entity).getGuid(); + discoveredEntities.addUnResolvedIdReference(type, guid); + } else if ( entity instanceof AtlasEntity ) { + AtlasEntity entityObj = ( AtlasEntity ) entity; + if (!processedIds.contains(entityObj.getGuid())) { + processedIds.add(entityObj.getGuid()); + + if ( isManagedEntity ) { + discoveredEntities.addRootEntity(entityObj); + visitStruct(type, entityObj); + } else if ( entity instanceof AtlasObjectId) { + discoveredEntities.addUnResolvedIdReference(type, ((AtlasObjectId) entity).getGuid()); + } else { + discoveredEntities.addUnResolvedEntityReference(entityObj); + } + } + } + } + } + + void visitAttribute(AtlasStructType parentType, AtlasType attrType, AtlasStructDef.AtlasAttributeDef attrDef, Object val) throws AtlasBaseException { + if (val != null) { + if ( isPrimitive(attrType.getTypeCategory()) ) { + return; + } + if (attrType.getTypeCategory() == TypeCategory.ARRAY) { + AtlasArrayType arrayType = (AtlasArrayType) attrType; + AtlasType elemType = arrayType.getElementType(); + visitCollectionReferences(parentType, attrType, attrDef, elemType, val); + } else if (attrType.getTypeCategory() == TypeCategory.MAP) { + AtlasType keyType = ((AtlasMapType) attrType).getKeyType(); + AtlasType valueType = ((AtlasMapType) attrType).getValueType(); + visitMapReferences(parentType, attrType, attrDef, keyType, valueType, val); + } else if (attrType.getTypeCategory() == TypeCategory.STRUCT) { + visitStruct(attrType, val); + } else if (attrType.getTypeCategory() == TypeCategory.ENTITY) { + if ( val instanceof AtlasObjectId || val instanceof String) { + visitReference((AtlasEntityType) attrType, val, false); + } else if ( val instanceof AtlasEntity ) { + //TODO - Change this to foreign key checks after changes in the model + if ( parentType.isMappedFromRefAttribute(attrDef.getName())) { + visitReference((AtlasEntityType) attrType, val, true); + } + } + } + } + } + + void visitMapReferences(AtlasStructType parentType, final AtlasType attrType, AtlasStructDef.AtlasAttributeDef attrDef, AtlasType keyType, AtlasType valueType, Object val) throws AtlasBaseException { + if (isPrimitive(keyType.getTypeCategory()) && isPrimitive(valueType.getTypeCategory())) { + return; + } + + if (val != null) { + Iterator<Map.Entry> it = null; + if (Map.class.isAssignableFrom(val.getClass())) { + it = ((Map) val).entrySet().iterator(); + ImmutableMap.Builder b = ImmutableMap.builder(); + while (it.hasNext()) { + Map.Entry e = it.next(); + visitAttribute(parentType, keyType, attrDef, e.getKey()); + visitAttribute(parentType, valueType, attrDef, e.getValue()); + } + } + } + } + + void visitCollectionReferences(final AtlasStructType parentType, final AtlasType attrType, final AtlasStructDef.AtlasAttributeDef attrDef, AtlasType elemType, Object val) throws AtlasBaseException { + + if (isPrimitive(elemType.getTypeCategory())) { + return; + } + + if (val != null) { + Iterator it = null; + if (val instanceof Collection) { + it = ((Collection) val).iterator(); + } else if (val instanceof Iterable) { + it = ((Iterable) val).iterator(); + } else if (val instanceof Iterator) { + it = (Iterator) val; + } + if (it != null) { + while (it.hasNext()) { + Object elem = it.next(); + visitAttribute(parentType, elemType, attrDef, elem); + } + } + } + } + + void visitStruct(AtlasType type, Object val) throws AtlasBaseException { + + if (val == null || !(val instanceof AtlasStruct)) { + return; + } + + AtlasStructType structType = (AtlasStructType) type; + + for (AtlasStructDef.AtlasAttributeDef attributeDef : structType.getStructDef().getAttributeDefs()) { + String attrName = attributeDef.getName(); + AtlasType attrType = structType.getAttributeType(attrName); + Object attrVal = ((AtlasStruct) val).getAttribute(attrName); + visitAttribute(structType, attrType, attributeDef, attrVal); + } + } + + + void walkEntityGraph(AtlasType type, AtlasEntity entity) throws AtlasBaseException { + visitStruct(type, entity); + } + + + boolean isPrimitive(TypeCategory typeCategory) { + return typeCategory == TypeCategory.PRIMITIVE || typeCategory == TypeCategory.ENUM; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java index e731c11..18e397b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java @@ -18,25 +18,93 @@ package org.apache.atlas.repository.store.graph.v1; +import atlas.shaded.hbase.guava.common.annotations.VisibleForTesting; +import com.google.inject.Inject; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.GraphTransaction; +import org.apache.atlas.RequestContextV1; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.SearchFilter; +import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntityWithAssociations; +import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutations; +import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.AtlasEntityStore; - +import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; +import org.apache.atlas.repository.store.graph.EntityGraphDiscovery; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; import java.util.List; public class AtlasEntityStoreV1 implements AtlasEntityStore { - @Override - public void init() throws AtlasBaseException { + + protected EntityGraphDiscovery graphDiscoverer; + protected AtlasTypeRegistry typeRegistry; + + private EntityGraphMapper graphMapper; + + private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class); + + @Inject + public AtlasEntityStoreV1(EntityGraphMapper vertexMapper) { + this.graphMapper = vertexMapper; + } + + @Inject + public void init(AtlasTypeRegistry typeRegistry, EntityGraphDiscovery graphDiscoverer) throws AtlasBaseException { + this.graphDiscoverer = graphDiscoverer; + this.typeRegistry = typeRegistry; } @Override - public EntityMutationResponse createOrUpdate(final AtlasEntity entity) { - return null; + public EntityMutationResponse createOrUpdate(final AtlasEntity entity) throws AtlasBaseException { + return createOrUpdate(new ArrayList<AtlasEntity>() {{ add(entity); }}); + } + + public EntityMutationContext preCreateOrUpdate(final List<AtlasEntity> atlasEntities) throws AtlasBaseException { + + EntityGraphDiscoveryContext discoveredEntities = graphDiscoverer.discoverEntities(atlasEntities); + EntityMutationContext context = new EntityMutationContext(discoveredEntities); + for (AtlasEntity entity : discoveredEntities.getRootEntities()) { + + AtlasVertex vertex = null; + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasEntityStoreV1.preCreateOrUpdate({}): {}", entity); + } + + AtlasEntityType entityType = (AtlasEntityType) typeRegistry.getType(entity.getTypeName()); + + if ( discoveredEntities.isResolved(entity.getGuid()) ) { + vertex = discoveredEntities.getResolvedReference(entity.getGuid()); + context.addUpdated(entity, entityType, vertex); + + String guid = AtlasGraphUtilsV1.getIdFromVertex(vertex); + RequestContextV1.get().recordEntityUpdate(guid); + } else { + //Create vertices which do not exist in the repository + vertex = graphMapper.createVertexTemplate(entity, entityType); + context.addCreated(entity, entityType, vertex); + discoveredEntities.addRepositoryResolvedReference(new AtlasObjectId(entityType.getTypeName(), entity.getGuid()), vertex); + + String guid = AtlasGraphUtilsV1.getIdFromVertex(vertex); + RequestContextV1.get().recordEntityCreate(guid); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasEntityStoreV1.preCreateOrUpdate({}): {}", entity, vertex); + } + } + + return context; } @Override @@ -55,8 +123,24 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { } @Override + @GraphTransaction public EntityMutationResponse createOrUpdate(final List<AtlasEntity> entities) throws AtlasBaseException { - return null; + + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasEntityStoreV1.createOrUpdate({}, {})", entities); + } + + //Validate + List<AtlasEntity> normalizedEntities = validateAndNormalize(entities); + + //Discover entities, create vertices + EntityMutationContext ctx = preCreateOrUpdate(normalizedEntities); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasStructDefStoreV1.createOrUpdate({}, {}): {}", entities); + } + + return graphMapper.mapAttributes(ctx); } @Override @@ -117,7 +201,48 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { @Override public AtlasEntity.AtlasEntities searchEntities(final SearchFilter searchFilter) throws AtlasBaseException { - // TODO: Add checks here to ensure that typename and supertype are mandatory in the requests + // TODO: Add checks here to ensure that typename and supertype are mandatory in the request return null; } + + private List<AtlasEntity> validateAndNormalize(final List<AtlasEntity> entities) throws AtlasBaseException { + + List<AtlasEntity> normalizedEntities = new ArrayList<>(); + List<String> messages = new ArrayList<>(); + + for (AtlasEntity entity : entities) { + AtlasType type = typeRegistry.getType(entity.getTypeName()); + if (type.getTypeCategory() != TypeCategory.ENTITY) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, type.getTypeCategory().name(), TypeCategory.ENTITY.name()); + } + + type.validateValue(entity, entity.getTypeName(), messages); + + if ( !messages.isEmpty()) { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages); + } + AtlasEntity normalizedEntity = (AtlasEntity) type.getNormalizedValue(entity); + if ( normalizedEntity == null) { + //TODO - Fix this. Should not come here. Should ideally fail above + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, "Failed to validate entity"); + } + normalizedEntities.add(normalizedEntity); + } + + return normalizedEntities; + } + + @VisibleForTesting + EntityGraphDiscovery getGraphDiscoverer() { + return graphDiscoverer; + } + + @VisibleForTesting + void setGraphDiscoverer(EntityGraphDiscovery discoverer) { + this.graphDiscoverer = discoverer; + } + + public void cleanUp() throws AtlasBaseException { + this.graphDiscoverer.cleanUp(); + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEnumDefStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEnumDefStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEnumDefStoreV1.java index fccbeba..3cb10ff 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEnumDefStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEnumDefStoreV1.java @@ -283,19 +283,19 @@ public class AtlasEnumDefStoreV1 extends AtlasAbstractDefStoreV1 implements Atla List<String> values = new ArrayList<>(enumDef.getElementDefs().size()); for (AtlasEnumElementDef element : enumDef.getElementDefs()) { - String elemKey = AtlasGraphUtilsV1.getPropertyKey(enumDef, element.getValue()); + String elemKey = AtlasGraphUtilsV1.getTypeDefPropertyKey(enumDef, element.getValue()); AtlasGraphUtilsV1.setProperty(vertex, elemKey, element.getOrdinal()); if (StringUtils.isNoneBlank(element.getDescription())) { - String descKey = AtlasGraphUtilsV1.getPropertyKey(elemKey, "description"); + String descKey = AtlasGraphUtilsV1.getTypeDefPropertyKey(elemKey, "description"); AtlasGraphUtilsV1.setProperty(vertex, descKey, element.getDescription()); } values.add(element.getValue()); } - AtlasGraphUtilsV1.setProperty(vertex, AtlasGraphUtilsV1.getPropertyKey(enumDef), values); + AtlasGraphUtilsV1.setProperty(vertex, AtlasGraphUtilsV1.getTypeDefPropertyKey(enumDef), values); } private AtlasEnumDef toEnumDef(AtlasVertex vertex) { @@ -314,10 +314,10 @@ public class AtlasEnumDefStoreV1 extends AtlasAbstractDefStoreV1 implements Atla typeDefStore.vertexToTypeDef(vertex, ret); List<AtlasEnumElementDef> elements = new ArrayList<>(); - List<String> elemValues = vertex.getProperty(AtlasGraphUtilsV1.getPropertyKey(ret), List.class); + List<String> elemValues = vertex.getProperty(AtlasGraphUtilsV1.getTypeDefPropertyKey(ret), List.class); for (String elemValue : elemValues) { - String elemKey = AtlasGraphUtilsV1.getPropertyKey(ret, elemValue); - String descKey = AtlasGraphUtilsV1.getPropertyKey(elemKey, "description"); + String elemKey = AtlasGraphUtilsV1.getTypeDefPropertyKey(ret, elemValue); + String descKey = AtlasGraphUtilsV1.getTypeDefPropertyKey(elemKey, "description"); Integer ordinal = AtlasGraphUtilsV1.getProperty(vertex, elemKey, Integer.class); String desc = AtlasGraphUtilsV1.getProperty(vertex, descKey, String.class); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java index 18b3b85..1947855 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java @@ -20,16 +20,30 @@ package org.apache.atlas.repository.store.graph.v1; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasException; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.TypeCategory; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasElement; import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.typesystem.types.DataTypes; +import org.apache.atlas.typesystem.types.IDataType; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; +import java.util.Date; import java.util.HashMap; /** @@ -52,19 +66,19 @@ public class AtlasGraphUtilsV1 { }}); - public static String getPropertyKey(AtlasBaseTypeDef typeDef) { - return getPropertyKey(typeDef.getName()); + public static String getTypeDefPropertyKey(AtlasBaseTypeDef typeDef) { + return getTypeDefPropertyKey(typeDef.getName()); } - public static String getPropertyKey(AtlasBaseTypeDef typeDef, String child) { - return getPropertyKey(typeDef.getName(), child); + public static String getTypeDefPropertyKey(AtlasBaseTypeDef typeDef, String child) { + return getTypeDefPropertyKey(typeDef.getName(), child); } - public static String getPropertyKey(String typeName) { + public static String getTypeDefPropertyKey(String typeName) { return PROPERTY_PREFIX + typeName; } - public static String getPropertyKey(String typeName, String child) { + public static String getTypeDefPropertyKey(String typeName, String child) { return PROPERTY_PREFIX + typeName + "." + child; } @@ -80,6 +94,31 @@ public class AtlasGraphUtilsV1 { return PROPERTY_PREFIX + "edge." + fromNode + "." + toNode; } + public static String getAttributeEdgeLabel(AtlasStructType fromType, String attributeName) throws AtlasBaseException { + return GraphHelper.EDGE_LABEL_PREFIX + getQualifiedAttributePropertyKey(fromType, attributeName); + } + + public static String getQualifiedAttributePropertyKey(AtlasStructType fromType, String attributeName) throws AtlasBaseException { + switch (fromType.getTypeCategory()) { + case STRUCT: + case ENTITY: + case CLASSIFICATION: + return fromType.getQualifiedAttributeName(attributeName); + default: + throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPE, fromType.getTypeCategory().name()); + } + } + + public static boolean isReference(AtlasType type) { + return isReference(type.getTypeCategory()); + } + + public static boolean isReference(TypeCategory typeCategory) { + return typeCategory == TypeCategory.STRUCT || + typeCategory == TypeCategory.ENTITY || + typeCategory == TypeCategory.CLASSIFICATION; + } + public static String encodePropertyKey(String key) { String ret = key; @@ -104,6 +143,21 @@ public class AtlasGraphUtilsV1 { return ret; } + /** + * Adds an additional value to a multi-property. + * + * @param propertyName + * @param value + */ + public static AtlasVertex addProperty(AtlasVertex vertex, String propertyName, Object value) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> addProperty({}, {}, {})", toString(vertex), propertyName, value); + } + propertyName = encodePropertyKey(propertyName); + vertex.addProperty(propertyName, value); + return vertex; + } + public static <T extends AtlasElement> void setProperty(T element, String propertyName, Object value) { if (LOG.isDebugEnabled()) { LOG.debug("==> setProperty({}, {}, {})", toString(element), propertyName, value); @@ -127,7 +181,12 @@ public class AtlasGraphUtilsV1 { LOG.debug("Setting property {} in {}", propertyName, toString(element)); } - element.setProperty(propertyName, value); + if ( value instanceof Date) { + Long encodedValue = ((Date) value).getTime(); + element.setProperty(propertyName, encodedValue); + } else { + element.setProperty(propertyName, value); + } } } } @@ -186,4 +245,13 @@ public class AtlasGraphUtilsV1 { return String.format("edge[id=%s label=%s from %s -> to %s]", edge.getId(), edge.getLabel(), toString(edge.getOutVertex()), toString(edge.getInVertex())); } + + public static AtlasEntity.Status getState(AtlasElement element) { + String state = getStateAsString(element); + return state == null ? null : AtlasEntity.Status.valueOf(state); + } + + public static String getStateAsString(AtlasElement element) { + return element.getProperty(Constants.STATE_PROPERTY_KEY, String.class); + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasStructDefStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasStructDefStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasStructDefStoreV1.java index e780dc1..425bde9 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasStructDefStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasStructDefStoreV1.java @@ -390,13 +390,13 @@ public class AtlasStructDefStoreV1 extends AtlasAbstractDefStoreV1 implements At List<String> attrNames = new ArrayList<>(structDef.getAttributeDefs().size()); for (AtlasAttributeDef attributeDef : structDef.getAttributeDefs()) { - String propertyKey = AtlasGraphUtilsV1.getPropertyKey(structDef, attributeDef.getName()); + String propertyKey = AtlasGraphUtilsV1.getTypeDefPropertyKey(structDef, attributeDef.getName()); AtlasGraphUtilsV1.setProperty(vertex, propertyKey, toJsonFromAttributeDef(attributeDef, structType)); attrNames.add(attributeDef.getName()); } - AtlasGraphUtilsV1.setProperty(vertex, AtlasGraphUtilsV1.getPropertyKey(structDef), attrNames); + AtlasGraphUtilsV1.setProperty(vertex, AtlasGraphUtilsV1.getTypeDefPropertyKey(structDef), attrNames); } public static void updateVertexPreUpdate(AtlasStructDef structDef, AtlasStructType structType, @@ -410,7 +410,7 @@ public class AtlasStructDefStoreV1 extends AtlasAbstractDefStoreV1 implements At } } - List<String> currAttrNames = vertex.getProperty(AtlasGraphUtilsV1.getPropertyKey(structDef), List.class); + List<String> currAttrNames = vertex.getProperty(AtlasGraphUtilsV1.getTypeDefPropertyKey(structDef), List.class); // delete attributes that are not present in updated structDef if (CollectionUtils.isNotEmpty(currAttrNames)) { @@ -434,13 +434,13 @@ public class AtlasStructDefStoreV1 extends AtlasAbstractDefStoreV1 implements At } } - String propertyKey = AtlasGraphUtilsV1.getPropertyKey(structDef, attributeDef.getName()); + String propertyKey = AtlasGraphUtilsV1.getTypeDefPropertyKey(structDef, attributeDef.getName()); AtlasGraphUtilsV1.setProperty(vertex, propertyKey, toJsonFromAttributeDef(attributeDef, structType)); } } - AtlasGraphUtilsV1.setProperty(vertex, AtlasGraphUtilsV1.getPropertyKey(structDef), attrNames); + AtlasGraphUtilsV1.setProperty(vertex, AtlasGraphUtilsV1.getTypeDefPropertyKey(structDef), attrNames); } public static void updateVertexAddReferences(AtlasStructDef structDef, AtlasVertex vertex, @@ -457,11 +457,11 @@ public class AtlasStructDefStoreV1 extends AtlasAbstractDefStoreV1 implements At typeDefStore.vertexToTypeDef(vertex, ret); List<AtlasAttributeDef> attributeDefs = new ArrayList<>(); - List<String> attrNames = vertex.getProperty(AtlasGraphUtilsV1.getPropertyKey(ret), List.class); + List<String> attrNames = vertex.getProperty(AtlasGraphUtilsV1.getTypeDefPropertyKey(ret), List.class); if (CollectionUtils.isNotEmpty(attrNames)) { for (String attrName : attrNames) { - String propertyKey = AtlasGraphUtilsV1.getPropertyKey(ret, attrName); + String propertyKey = AtlasGraphUtilsV1.getTypeDefPropertyKey(ret, attrName); String attribJson = vertex.getProperty(propertyKey, String.class); attributeDefs.add(toAttributeDefFromJson(structDef, AtlasType.fromJson(attribJson, Map.class), @@ -586,12 +586,12 @@ public class AtlasStructDefStoreV1 extends AtlasAbstractDefStoreV1 implements At String refAttributeName = null; List<String> attrNames = attributeType.getProperty( - AtlasGraphUtilsV1.getPropertyKey(attrTypeName), List.class); + AtlasGraphUtilsV1.getTypeDefPropertyKey(attrTypeName), List.class); if (CollectionUtils.isNotEmpty(attrNames)) { for (String attrName : attrNames) { String attribJson = attributeType.getProperty( - AtlasGraphUtilsV1.getPropertyKey(attrTypeName, attrName), String.class); + AtlasGraphUtilsV1.getTypeDefPropertyKey(attrTypeName, attrName), String.class); Map refAttrInfo = AtlasType.fromJson(attribJson, Map.class); String refAttribType = (String) refAttrInfo.get("dataType"); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java new file mode 100644 index 0000000..3b557e6 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java @@ -0,0 +1,539 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v1; + + +import org.apache.atlas.AtlasException; +import org.apache.atlas.RequestContextV1; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.TypeCategory; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graph.AtlasEdgeLabel; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasArrayType; +import org.apache.atlas.type.AtlasClassificationType; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasMapType; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.Stack; + +import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX; +import static org.apache.atlas.repository.graph.GraphHelper.string; + +public abstract class DeleteHandlerV1 { + + public static final Logger LOG = LoggerFactory.getLogger(DeleteHandlerV1.class); + + private AtlasTypeRegistry typeRegistry; + private boolean shouldUpdateReverseAttribute; + private boolean softDelete; + + protected static final GraphHelper graphHelper = GraphHelper.getInstance(); + + public DeleteHandlerV1(AtlasTypeRegistry typeRegistry, boolean shouldUpdateReverseAttribute, boolean softDelete) { + this.typeRegistry = typeRegistry; + this.shouldUpdateReverseAttribute = shouldUpdateReverseAttribute; + this.softDelete = softDelete; + } + + /** + * Deletes the specified entity vertices. + * Deletes any traits, composite entities, and structs owned by each entity. + * Also deletes all the references from/to the entity. + * + * @param instanceVertices + * @throws AtlasException + */ + public void deleteEntities(List<AtlasVertex> instanceVertices) throws AtlasBaseException { + RequestContextV1 requestContext = RequestContextV1.get(); + + Set<AtlasVertex> deletionCandidateVertices = new HashSet<>(); + + for (AtlasVertex instanceVertex : instanceVertices) { + String guid = GraphHelper.getGuid(instanceVertex); + AtlasEntity.Status state = AtlasGraphUtilsV1.getState(instanceVertex); + if (requestContext.getDeletedEntityIds().contains(guid) || state == AtlasEntity.Status.DELETED) { + LOG.debug("Skipping deletion of {} as it is already deleted", guid); + continue; + } + + // Get GUIDs and vertices for all deletion candidates. + Set<GraphHelper.VertexInfo> compositeVertices = getCompositeVertices(instanceVertex); + + // Record all deletion candidate GUIDs in RequestContext + // and gather deletion candidate vertices. + for (GraphHelper.VertexInfo vertexInfo : compositeVertices) { + requestContext.recordEntityDelete(vertexInfo.getGuid()); + deletionCandidateVertices.add(vertexInfo.getVertex()); + } + } + + // Delete traits and vertices. + for (AtlasVertex deletionCandidateVertex : deletionCandidateVertices) { + deleteAllTraits(deletionCandidateVertex); + deleteTypeVertex(deletionCandidateVertex, false); + } + } + + /** + * Get the GUIDs and vertices for all composite entities owned/contained by the specified root entity AtlasVertex. + * The graph is traversed from the root entity through to the leaf nodes of the containment graph. + * + * @param entityVertex the root entity vertex + * @return set of VertexInfo for all composite entities + * @throws AtlasException + */ + public Set<GraphHelper.VertexInfo> getCompositeVertices(AtlasVertex entityVertex) throws AtlasBaseException { + Set<GraphHelper.VertexInfo> result = new HashSet<>(); + Stack<AtlasVertex> vertices = new Stack<>(); + vertices.push(entityVertex); + while (vertices.size() > 0) { + AtlasVertex vertex = vertices.pop(); + String typeName = GraphHelper.getTypeName(vertex); + String guid = GraphHelper.getGuid(vertex); + AtlasEntity.Status state = AtlasGraphUtilsV1.getState(vertex); + if (state == AtlasEntity.Status.DELETED) { + //If the reference vertex is marked for deletion, skip it + continue; + } + result.add(new GraphHelper.VertexInfo(guid, vertex, typeName)); + AtlasEntityType entityType = (AtlasEntityType) typeRegistry.getType(typeName); + for (AtlasStructType.AtlasAttribute attributeInfo : entityType.getAllAttributes().values()) { + if (!entityType.isMappedFromRefAttribute(attributeInfo.getAttributeDef().getName())) { + continue; + } + String edgeLabel = AtlasGraphUtilsV1.getAttributeEdgeLabel(entityType, attributeInfo.getAttributeDef().getName()); + AtlasType attrType = typeRegistry.getType(attributeInfo.getAttributeDef().getTypeName()); + switch (attrType.getTypeCategory()) { + case ENTITY: + AtlasEdge edge = graphHelper.getEdgeForLabel(vertex, edgeLabel); + if (edge != null && AtlasGraphUtilsV1.getState(edge) == AtlasEntity.Status.ACTIVE) { + AtlasVertex compositeVertex = edge.getInVertex(); + vertices.push(compositeVertex); + } + break; + case ARRAY: + AtlasArrayType arrType = (AtlasArrayType) attrType; + if (arrType.getElementType().getTypeCategory() != TypeCategory.ENTITY) { + continue; + } + Iterator<AtlasEdge> edges = graphHelper.getOutGoingEdgesByLabel(vertex, edgeLabel); + if (edges != null) { + while (edges.hasNext()) { + edge = edges.next(); + if (edge != null && AtlasGraphUtilsV1.getState(edge) == AtlasEntity.Status.ACTIVE) { + AtlasVertex compositeVertex = edge.getInVertex(); + vertices.push(compositeVertex); + } + } + } + break; + case MAP: + AtlasMapType mapType = (AtlasMapType) attrType; + TypeCategory valueTypeCategory = mapType.getValueType().getTypeCategory(); + if (valueTypeCategory != TypeCategory.ENTITY) { + continue; + } + String propertyName = AtlasGraphUtilsV1.getQualifiedAttributePropertyKey(entityType, attributeInfo.getAttributeDef().getName()); + List<String> keys = vertex.getProperty(propertyName, List.class); + if (keys != null) { + for (String key : keys) { + String mapEdgeLabel = GraphHelper.getQualifiedNameForMapKey(edgeLabel, key); + edge = graphHelper.getEdgeForLabel(vertex, mapEdgeLabel); + if (edge != null && AtlasGraphUtilsV1.getState(edge) == AtlasEntity.Status.ACTIVE) { + AtlasVertex compositeVertex = edge.getInVertex(); + vertices.push(compositeVertex); + } + } + } + break; + default: + } + } + } + return result; + } + + + /** + * Force delete is used to remove struct/trait in case of entity updates + * @param edge + * @param typeCategory + * @param isComposite + * @param forceDeleteStructTrait + * @return returns true if the edge reference is hard deleted + * @throws AtlasException + */ + public boolean deleteEdgeReference(AtlasEdge edge, TypeCategory typeCategory, boolean isComposite, + boolean forceDeleteStructTrait) throws AtlasBaseException { + LOG.debug("Deleting {}", string(edge)); + boolean forceDelete = + (AtlasGraphUtilsV1.isReference(typeCategory)) + ? forceDeleteStructTrait : false; + if (AtlasGraphUtilsV1.isReference(typeCategory) && isComposite) { + //If the vertex is of type struct/trait, delete the edge and then the reference vertex as the vertex is not shared by any other entities. + //If the vertex is of type class, and its composite attribute, this reference vertex' lifecycle is controlled + //through this delete, hence delete the edge and the reference vertex. + AtlasVertex vertexForDelete = edge.getInVertex(); + + //If deleting the edge and then the in vertex, reverse attribute shouldn't be updated + deleteEdge(edge, false, forceDelete); + deleteTypeVertex(vertexForDelete, typeCategory, forceDelete); + } else { + //If the vertex is of type class, and its not a composite attributes, the reference AtlasVertex' lifecycle is not controlled + //through this delete. Hence just remove the reference edge. Leave the reference AtlasVertex as is + + //If deleting just the edge, reverse attribute should be updated for any references + //For example, for the department type system, if the person's manager edge is deleted, subordinates of manager should be updated + deleteEdge(edge, true, false); + } + return !softDelete || forceDelete; + } + + protected void deleteEdge(AtlasEdge edge, boolean updateReverseAttribute, boolean force) throws AtlasBaseException { + //update reverse attribute + if (updateReverseAttribute) { + AtlasEdgeLabel atlasEdgeLabel = new AtlasEdgeLabel(edge.getLabel()); + + AtlasType parentType = typeRegistry.getType(atlasEdgeLabel.getTypeName()); + + if (parentType instanceof AtlasStructType) { + AtlasStructType parentStructType = (AtlasStructType) parentType; + if (parentStructType.isForeignKeyAttribute(atlasEdgeLabel.getAttributeName())) { + deleteEdgeBetweenVertices(edge.getInVertex(), edge.getOutVertex(), atlasEdgeLabel.getAttributeName()); + } + } + } + + deleteEdge(edge, force); + + } + + + protected void deleteTypeVertex(AtlasVertex instanceVertex, TypeCategory typeCategory, boolean force) throws AtlasBaseException { + switch (typeCategory) { + case STRUCT: + case CLASSIFICATION: + deleteTypeVertex(instanceVertex, force); + break; + + case ENTITY: + deleteEntities(Collections.singletonList(instanceVertex)); + break; + + default: + throw new IllegalStateException("Type category " + typeCategory + " not handled"); + } + } + + /** + * Deleting any type vertex. Goes over the complex attributes and removes the references + * @param instanceVertex + * @throws AtlasException + */ + protected void deleteTypeVertex(AtlasVertex instanceVertex, boolean force) throws AtlasBaseException { + LOG.debug("Deleting {}", string(instanceVertex)); + String typeName = GraphHelper.getTypeName(instanceVertex); + + + AtlasType parentType = typeRegistry.getType(typeName); + + if (parentType instanceof AtlasStructType) { + + AtlasStructType entityType = (AtlasStructType) parentType; + for (AtlasStructType.AtlasAttribute attributeInfo : getAttributes(entityType)) { + LOG.debug("Deleting attribute {} for {}", attributeInfo.getAttributeDef().getName(), string(instanceVertex)); + + AtlasType attrType = typeRegistry.getType(attributeInfo.getAttributeType().getTypeName()); + + String edgeLabel = AtlasGraphUtilsV1.getAttributeEdgeLabel(entityType, attributeInfo.getAttributeDef().getName()); + + switch (attrType.getTypeCategory()) { + case ENTITY: + //If its class attribute, delete the reference + deleteEdgeReference(instanceVertex, edgeLabel, TypeCategory.ENTITY, entityType.isMappedFromRefAttribute(attributeInfo.getAttributeDef().getName())); + break; + + case STRUCT: + //If its struct attribute, delete the reference + deleteEdgeReference(instanceVertex, edgeLabel, TypeCategory.STRUCT, false); + break; + + case ARRAY: + //For array attribute, if the element is struct/class, delete all the references + AtlasArrayType arrType = (AtlasArrayType) attrType; + AtlasType elemType = arrType.getElementType(); + if (AtlasGraphUtilsV1.isReference(elemType.getTypeCategory())) { + Iterator<AtlasEdge> edges = graphHelper.getOutGoingEdgesByLabel(instanceVertex, edgeLabel); + if (edges != null) { + while (edges.hasNext()) { + AtlasEdge edge = edges.next(); + deleteEdgeReference(edge, elemType.getTypeCategory(), entityType.isMappedFromRefAttribute(attributeInfo.getAttributeDef().getName()), false); + } + } + } + break; + + case MAP: + //For map attribute, if the value type is struct/class, delete all the references + AtlasMapType mapType = (AtlasMapType) attrType; + AtlasType keyType = mapType.getKeyType(); + TypeCategory valueTypeCategory = mapType.getValueType().getTypeCategory(); + String propertyName = AtlasGraphUtilsV1.getQualifiedAttributePropertyKey(entityType, attributeInfo.getAttributeDef().getName()); + + if (AtlasGraphUtilsV1.isReference(valueTypeCategory)) { + List<Object> keys = ArrayVertexMapper.getArrayElementsProperty(keyType, instanceVertex, propertyName); + if (keys != null) { + for (Object key : keys) { + String mapEdgeLabel = GraphHelper.getQualifiedNameForMapKey(edgeLabel, (String) key); + deleteEdgeReference(instanceVertex, mapEdgeLabel, valueTypeCategory, entityType.isMappedFromRefAttribute(attributeInfo.getAttributeDef().getName())); + } + } + } + } + } + } + + deleteVertex(instanceVertex, force); + } + + public void deleteEdgeReference(AtlasVertex outVertex, String edgeLabel, TypeCategory typeCategory, + boolean isComposite) throws AtlasBaseException { + AtlasEdge edge = graphHelper.getEdgeForLabel(outVertex, edgeLabel); + if (edge != null) { + deleteEdgeReference(edge, typeCategory, isComposite, false); + } + } + + /** + * Delete all traits from the specified vertex. + * @param instanceVertex + * @throws AtlasException + */ + private void deleteAllTraits(AtlasVertex instanceVertex) throws AtlasBaseException { + List<String> traitNames = GraphHelper.getTraitNames(instanceVertex); + LOG.debug("Deleting traits {} for {}", traitNames, string(instanceVertex)); + String typeName = GraphHelper.getTypeName(instanceVertex); + + for (String traitNameToBeDeleted : traitNames) { + String relationshipLabel = GraphHelper.getTraitLabel(typeName, traitNameToBeDeleted); + deleteEdgeReference(instanceVertex, relationshipLabel, TypeCategory.CLASSIFICATION, false); + } + } + + protected AtlasStructDef.AtlasAttributeDef getAttributeForEdge(String edgeLabel) throws AtlasBaseException { + AtlasEdgeLabel atlasEdgeLabel = new AtlasEdgeLabel(edgeLabel); + + AtlasType parentType = typeRegistry.getType(atlasEdgeLabel.getTypeName()); + AtlasStructType parentStructType = (AtlasStructType) parentType; + + return parentStructType.getAttributeDef(atlasEdgeLabel.getAttributeName()); + } + + protected abstract void _deleteVertex(AtlasVertex instanceVertex, boolean force); + + protected abstract void deleteEdge(AtlasEdge edge, boolean force) throws AtlasBaseException; + + /** + * Deletes the edge between outvertex and inVertex. The edge is for attribute attributeName of outVertex + * @param outVertex + * @param inVertex + * @param attributeName + * @throws AtlasException + */ + protected void deleteEdgeBetweenVertices(AtlasVertex outVertex, AtlasVertex inVertex, String attributeName) throws AtlasBaseException { + LOG.debug("Removing edge from {} to {} with attribute name {}", string(outVertex), string(inVertex), + attributeName); + String typeName = GraphHelper.getTypeName(outVertex); + String outId = GraphHelper.getGuid(outVertex); + AtlasEntity.Status state = AtlasGraphUtilsV1.getState(outVertex); + if ((outId != null && RequestContextV1.get().isDeletedEntity(outId)) || state == AtlasEntity.Status.DELETED) { + //If the reference vertex is marked for deletion, skip updating the reference + return; + } + + AtlasStructType parentType = (AtlasStructType) typeRegistry.getType(typeName); + String propertyName = AtlasGraphUtilsV1.getQualifiedAttributePropertyKey(parentType, attributeName); + String edgeLabel = EDGE_LABEL_PREFIX + propertyName; + AtlasEdge edge = null; + + AtlasStructDef.AtlasAttributeDef attrDef = parentType.getAttributeDef(attributeName); + AtlasType attrType = typeRegistry.getType(attrDef.getTypeName()); + + switch (attrType.getTypeCategory()) { + case ENTITY: + //If its class attribute, its the only edge between two vertices + if (attrDef.getIsOptional()) { + edge = graphHelper.getEdgeForLabel(outVertex, edgeLabel); + if (shouldUpdateReverseAttribute) { + GraphHelper.setProperty(outVertex, propertyName, null); + } + } else { + // Cannot unset a required attribute. + throw new AtlasBaseException("Cannot unset required attribute " + propertyName + + " on " + GraphHelper.getVertexDetails(outVertex) + " edge = " + edgeLabel); + } + break; + + case ARRAY: + //If its array attribute, find the right edge between the two vertices and update array property + List<String> elements = GraphHelper.getListProperty(outVertex, propertyName); + if (elements != null) { + elements = new ArrayList<>(elements); //Make a copy, else list.remove reflects on titan.getProperty() + for (String elementEdgeId : elements) { + AtlasEdge elementEdge = graphHelper.getEdgeByEdgeId(outVertex, edgeLabel, elementEdgeId); + if (elementEdge == null) { + continue; + } + + AtlasVertex elementVertex = elementEdge.getInVertex(); + if (elementVertex.equals(inVertex)) { + edge = elementEdge; + + //TODO element.size includes deleted items as well. should exclude + if (!attrDef.getIsOptional() + && elements.size() <= attrDef.getValuesMinCount()) { + // Deleting this edge would violate the attribute's lower bound. + throw new AtlasBaseException( + "Cannot remove array element from required attribute " + + propertyName + " on " + + GraphHelper.getVertexDetails(outVertex) + " " + GraphHelper.getEdgeDetails(elementEdge)); + } + + if (shouldUpdateReverseAttribute) { + //if composite attribute, remove the reference as well. else, just remove the edge + //for example, when table is deleted, process still references the table + //but when column is deleted, table will not reference the deleted column + LOG.debug("Removing edge {} from the array attribute {}", string(elementEdge), + attributeName); + elements.remove(elementEdge.getId().toString()); + GraphHelper.setProperty(outVertex, propertyName, elements); + break; + + } + } + } + } + break; + + case MAP: + //If its map attribute, find the right edge between two vertices and update map property + List<String> keys = GraphHelper.getListProperty(outVertex, propertyName); + if (keys != null) { + keys = new ArrayList<>(keys); //Make a copy, else list.remove reflects on titan.getProperty() + for (String key : keys) { + String keyPropertyName = GraphHelper.getQualifiedNameForMapKey(propertyName, key); + String mapEdgeId = GraphHelper.getSingleValuedProperty(outVertex, keyPropertyName, String.class); + AtlasEdge mapEdge = graphHelper.getEdgeByEdgeId(outVertex, keyPropertyName, mapEdgeId); + if(mapEdge != null) { + AtlasVertex mapVertex = mapEdge.getInVertex(); + if (mapVertex.getId().toString().equals(inVertex.getId().toString())) { + //TODO keys.size includes deleted items as well. should exclude + if (attrDef.getIsOptional() || keys.size() > attrDef.getValuesMinCount()) { + edge = mapEdge; + } else { + // Deleting this entry would violate the attribute's lower bound. + throw new AtlasBaseException( + "Cannot remove map entry " + keyPropertyName + " from required attribute " + + propertyName + " on " + GraphHelper.getVertexDetails(outVertex) + " " + GraphHelper.getEdgeDetails(mapEdge)); + } + + if (shouldUpdateReverseAttribute) { + //remove this key + LOG.debug("Removing edge {}, key {} from the map attribute {}", string(mapEdge), key, + attributeName); + keys.remove(key); + GraphHelper.setProperty(outVertex, propertyName, keys); + GraphHelper.setProperty(outVertex, keyPropertyName, null); + } + break; + } + } + } + } + break; + + case STRUCT: + case CLASSIFICATION: + break; + + default: + throw new IllegalStateException("There can't be an edge from " + GraphHelper.getVertexDetails(outVertex) + " to " + + GraphHelper.getVertexDetails(inVertex) + " with attribute name " + attributeName + " which is not class/array/map attribute"); + } + + if (edge != null) { + deleteEdge(edge, false); + RequestContextV1 requestContext = RequestContextV1.get(); + GraphHelper.setProperty(outVertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, + requestContext.getRequestTime()); + GraphHelper.setProperty(outVertex, Constants.MODIFIED_BY_KEY, requestContext.getUser()); + requestContext.recordEntityUpdate(outId); + } + } + + protected void deleteVertex(AtlasVertex instanceVertex, boolean force) throws AtlasBaseException { + //Update external references(incoming edges) to this vertex + LOG.debug("Setting the external references to {} to null(removing edges)", string(instanceVertex)); + + for (AtlasEdge edge : (Iterable<AtlasEdge>) instanceVertex.getEdges(AtlasEdgeDirection.IN)) { + AtlasEntity.Status edgeState = AtlasGraphUtilsV1.getState(edge); + if (edgeState == AtlasEntity.Status.ACTIVE) { + //Delete only the active edge references + AtlasStructDef.AtlasAttributeDef attribute = getAttributeForEdge(edge.getLabel()); + //TODO use delete edge instead?? + deleteEdgeBetweenVertices(edge.getOutVertex(), edge.getInVertex(), attribute.getName()); + } + } + _deleteVertex(instanceVertex, force); + } + + private Collection<AtlasStructType.AtlasAttribute> getAttributes(AtlasStructType structType) { + Collection<AtlasStructType.AtlasAttribute> ret = null; + + if (structType.getTypeCategory() == TypeCategory.STRUCT) { + ret = structType.getAllAttributes().values(); + } else if (structType.getTypeCategory() == TypeCategory.CLASSIFICATION) { + ret = ((AtlasClassificationType)structType).getAllAttributes().values(); + } else if (structType.getTypeCategory() == TypeCategory.ENTITY) { + ret = ((AtlasEntityType)structType).getAllAttributes().values(); + } else { + ret = Collections.emptyList(); + } + + return ret; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java new file mode 100644 index 0000000..174e490 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v1; + + +import com.google.inject.Inject; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContextV1; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.instance.EntityMutations; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.RepositoryException; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasStructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.UUID; + +public class EntityGraphMapper implements InstanceGraphMapper<AtlasEdge> { + + private static final Logger LOG = LoggerFactory.getLogger(EntityGraphMapper.class); + + protected final GraphHelper graphHelper = GraphHelper.getInstance(); + + protected EntityMutationContext context; + + protected final StructVertexMapper structVertexMapper; + + @Inject + public EntityGraphMapper(ArrayVertexMapper arrayVertexMapper, MapVertexMapper mapVertexMapper) { + this.structVertexMapper = new StructVertexMapper(arrayVertexMapper, mapVertexMapper); + arrayVertexMapper.init(structVertexMapper); + mapVertexMapper.init(structVertexMapper); + } + + public AtlasVertex createVertexTemplate(final AtlasStruct instance, final AtlasStructType structType) { + AtlasVertex vertex = structVertexMapper.createVertexTemplate(instance, structType); + + AtlasEntityType entityType = (AtlasEntityType) structType; + AtlasEntity entity = (AtlasEntity) instance; + + // add super types + for (String superTypeName : entityType.getAllSuperTypes()) { + AtlasGraphUtilsV1.addProperty(vertex, Constants.SUPER_TYPES_PROPERTY_KEY, superTypeName); + } + + final String guid = UUID.randomUUID().toString(); + + // add identity + AtlasGraphUtilsV1.setProperty(vertex, Constants.GUID_PROPERTY_KEY, guid); + + // add version information + AtlasGraphUtilsV1.setProperty(vertex, Constants.VERSION_PROPERTY_KEY, Integer.valueOf(entity.getVersion().intValue())); + + return vertex; + } + + + @Override + public AtlasEdge toGraph(GraphMutationContext ctx) throws AtlasBaseException { + AtlasEdge result = null; + + String guid = getId(ctx.getValue()); + + AtlasVertex entityVertex = context.getDiscoveryContext().getResolvedReference(guid); + String edgeLabel = AtlasGraphUtilsV1.getAttributeEdgeLabel(ctx.getParentType(), ctx.getAttributeDef().getName()); + if ( ctx.getCurrentEdge().isPresent() ) { + updateEdge(ctx.getAttributeDef(), ctx.getValue(), ctx.getCurrentEdge().get(), entityVertex); + result = ctx.getCurrentEdge().get(); + } else { + try { + result = graphHelper.getOrCreateEdge(ctx.getReferringVertex(), entityVertex, edgeLabel); + } catch (RepositoryException e) { + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e); + } + } + + return result; + } + + @Override + public void cleanUp() throws AtlasBaseException { + } + + private AtlasEdge updateEdge(AtlasStructDef.AtlasAttributeDef attributeDef, Object value, AtlasEdge currentEdge, final AtlasVertex entityVertex) throws AtlasBaseException { + + LOG.debug("Updating entity reference {} for reference attribute {}", attributeDef.getName()); + // Update edge if it exists + + AtlasVertex currentVertex = currentEdge.getOutVertex(); + String currentEntityId = AtlasGraphUtilsV1.getIdFromVertex(currentVertex); + String newEntityId = getId(value); + AtlasEdge newEdge = currentEdge; + if (!currentEntityId.equals(newEntityId)) { + // add an edge to the class vertex from the instance + if (entityVertex != null) { + try { + newEdge = graphHelper.getOrCreateEdge(currentEdge.getInVertex(), entityVertex, currentEdge.getLabel()); + } catch (RepositoryException e) { + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e); + } + + } + } + return newEdge; + } + + public EntityMutationResponse mapAttributes(EntityMutationContext ctx) throws AtlasBaseException { + + this.context = ctx; + structVertexMapper.init(this); + + EntityMutationResponse resp = new EntityMutationResponse(); + //Map attributes + if (ctx.getCreatedEntities() != null) { + for (AtlasEntity createdEntity : ctx.getCreatedEntities()) { + AtlasVertex vertex = ctx.getVertex(createdEntity); + structVertexMapper.mapAttributestoVertex((AtlasStructType) ctx.getType(createdEntity), createdEntity, vertex); + resp.addEntity(EntityMutations.EntityOperation.CREATE, constructHeader(createdEntity, vertex)); + } + } + + if (ctx.getUpdatedEntities() != null) { + for (AtlasEntity updated : ctx.getUpdatedEntities()) { + AtlasVertex vertex = ctx.getVertex(updated); + structVertexMapper.mapAttributestoVertex((AtlasStructType) ctx.getType(updated), updated, vertex); + + resp.addEntity(EntityMutations.EntityOperation.UPDATE, constructHeader(updated, vertex)); + } + } + + return resp; + } + + + public String getId(Object value) throws AtlasBaseException { + if ( value != null) { + if ( value instanceof AtlasObjectId) { + return ((AtlasObjectId) value).getGuid(); + } else if (value instanceof AtlasEntity) { + return ((AtlasEntity) value).getGuid(); + } + } + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, (String) value); + } + + private AtlasEntityHeader constructHeader(AtlasEntity entity, AtlasVertex vertex) { + //TODO - enhance to return only selective attributes + return new AtlasEntityHeader(entity.getTypeName(), AtlasGraphUtilsV1.getIdFromVertex(vertex), entity.getAttributes()); + } + + public EntityMutationContext getContext() { + return context; + } + + public AtlasEntityType getInstanceType(Object val) throws AtlasBaseException { + String guid = getId(val); + return (AtlasEntityType) getContext().getType(guid); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityMutationContext.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityMutationContext.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityMutationContext.java new file mode 100644 index 0000000..f942a91 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityMutationContext.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v1; + +import org.apache.atlas.model.instance.AtlasEntity; + +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; +import org.apache.atlas.type.AtlasType; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class EntityMutationContext { + + private List<AtlasEntity> entitiesCreated = new ArrayList<>(); + private List<AtlasEntity> entitiesUpdated = new ArrayList<>(); + + private EntityGraphDiscoveryContext context; + private Map<String, AtlasType> entityVsType = new HashMap<>(); + private Map<String, AtlasVertex> entityVsVertex = new HashMap<>(); + + public EntityMutationContext(final EntityGraphDiscoveryContext context) { + this.context = context; + } + + public void addCreated(AtlasEntity entity, AtlasType type, AtlasVertex atlasVertex) { + entitiesCreated.add(entity); + entityVsVertex.put(entity.getGuid(), atlasVertex); + entityVsType.put(entity.getGuid(), type); + } + + public void addUpdated(AtlasEntity entity, AtlasType type, AtlasVertex atlasVertex) { + entitiesUpdated.add(entity); + entityVsVertex.put(entity.getGuid(), atlasVertex); + entityVsType.put(entity.getGuid(), type); + } + + public Collection<AtlasEntity> getCreatedEntities() { + return entitiesCreated; + } + + public Collection<AtlasEntity> getUpdatedEntities() { + return entitiesUpdated; + } + + public AtlasType getType(AtlasEntity entity) { + return entityVsType.get(entity.getGuid()); + } + + public AtlasType getType(String entityId) { + return entityVsType.get(entityId); + } + + public AtlasVertex getVertex(AtlasEntity entity) { + return entityVsVertex.get(entity.getGuid()); + } + + public AtlasVertex getVertex(String entityId) { + return entityVsVertex.get(entityId); + } + + public EntityGraphDiscoveryContext getDiscoveryContext() { + return this.context; + } + + //TODO - equals/hashCode/toString + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + final EntityMutationContext that = (EntityMutationContext) o; + + if (entitiesCreated != null ? !entitiesCreated.equals(that.entitiesCreated) : that.entitiesCreated != null) + return false; + if (entitiesUpdated != null ? !entitiesUpdated.equals(that.entitiesUpdated) : that.entitiesUpdated != null) + return false; + if (context != null ? !context.equals(that.context) : that.context != null) return false; + if (entityVsType != null ? !entityVsType.equals(that.entityVsType) : that.entityVsType != null) return false; + return !(entityVsVertex != null ? !entityVsVertex.equals(that.entityVsVertex) : that.entityVsVertex != null); + + } + + @Override + public int hashCode() { + int result = entitiesCreated != null ? entitiesCreated.hashCode() : 0; + result = 31 * result + (entitiesUpdated != null ? entitiesUpdated.hashCode() : 0); + result = 31 * result + (context != null ? context.hashCode() : 0); + result = 31 * result + (entityVsType != null ? entityVsType.hashCode() : 0); + result = 31 * result + (entityVsVertex != null ? entityVsVertex.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "EntityMutationContext{" + + "entitiesCreated=" + entitiesCreated + + ", entitiesUpdated=" + entitiesUpdated + + ", context=" + context + + ", entityVsType=" + entityVsType + + ", entityVsVertex=" + entityVsVertex + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/GraphMutationContext.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/GraphMutationContext.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/GraphMutationContext.java new file mode 100644 index 0000000..d5ba7e1 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/GraphMutationContext.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v1; + + +import com.google.common.base.Optional; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasType; + +import java.util.Objects; + +public class GraphMutationContext { + + + /** + * Atlas Attribute + */ + + private AtlasStructType.AtlasAttribute attribute; + + /** + * Overriding type for which elements are being mapped + */ + private AtlasType currentElementType; + + /** + * Current attribute value/entity/Struct instance + */ + private Object value; + + /** + * + * The vertex which corresponds to the entity/struct for which we are mapping a complex attributes like struct, traits + */ + AtlasVertex referringVertex; + + /** + * the vertex property that we are updating + */ + + String vertexPropertyKey; + + /** + * The current edge(in case of updates) from the parent entity/struct to the complex attribute like struct, trait + */ + Optional<AtlasEdge> existingEdge; + + + private GraphMutationContext(final Builder builder) { + this.attribute = builder.attribute; + this.currentElementType = builder.elementType; + this.existingEdge = builder.currentEdge; + this.value = builder.currentValue; + this.referringVertex = builder.referringVertex; + this.vertexPropertyKey = builder.vertexPropertyKey; + } + + public String getVertexPropertyKey() { + return vertexPropertyKey; + } + + @Override + public int hashCode() { + return Objects.hash(attribute, value, referringVertex, vertexPropertyKey, existingEdge); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } else if (obj == this) { + return true; + } else if (obj.getClass() != getClass()) { + return false; + } else { + GraphMutationContext rhs = (GraphMutationContext) obj; + return Objects.equals(attribute, rhs.getAttribute()) + && Objects.equals(value, rhs.getValue()) + && Objects.equals(referringVertex, rhs.getReferringVertex()) + && Objects.equals(vertexPropertyKey, rhs.getReferringVertex()) + && Objects.equals(existingEdge, rhs.getCurrentEdge()); + } + } + + + public static final class Builder { + + private final AtlasStructType.AtlasAttribute attribute; + + private final AtlasType elementType; + + private final Object currentValue; + + private AtlasVertex referringVertex; + + private Optional<AtlasEdge> currentEdge = Optional.absent(); + + private String vertexPropertyKey; + + + public Builder(AtlasStructType.AtlasAttribute attribute, AtlasType currentElementType, Object currentValue) { + this.attribute = attribute; + this.elementType = currentElementType; + this.currentValue = currentValue; + } + + public Builder(AtlasStructType.AtlasAttribute attribute, Object currentValue) { + this.attribute = attribute; + this.elementType = null; + this.currentValue = currentValue; + } + + Builder referringVertex(AtlasVertex referringVertex) { + this.referringVertex = referringVertex; + return this; + } + + Builder edge(AtlasEdge edge) { + this.currentEdge = Optional.of(edge); + return this; + } + + Builder edge(Optional<AtlasEdge> edge) { + this.currentEdge = edge; + return this; + } + + Builder vertexProperty(String propertyKey) { + this.vertexPropertyKey = propertyKey; + return this; + } + + GraphMutationContext build() { + return new GraphMutationContext(this); + } + } + + public AtlasStructType getParentType() { + return attribute.getStructType(); + } + + public AtlasStructDef getStructDef() { + return attribute.getStructDef(); + } + + public AtlasStructDef.AtlasAttributeDef getAttributeDef() { + return attribute.getAttributeDef(); + } + + public AtlasType getAttrType() { + return currentElementType == null ? attribute.getAttributeType() : currentElementType; + } + + public AtlasType getCurrentElementType() { + return currentElementType; + } + + public Object getValue() { + return value; + } + + public AtlasVertex getReferringVertex() { + return referringVertex; + } + + public Optional<AtlasEdge> getCurrentEdge() { + return existingEdge; + } + + public void setElementType(final AtlasType attrType) { + this.currentElementType = attrType; + } + + public AtlasStructType.AtlasAttribute getAttribute() { + return attribute; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/HardDeleteHandlerV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/HardDeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/HardDeleteHandlerV1.java new file mode 100644 index 0000000..fe0db39 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/HardDeleteHandlerV1.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.store.graph.v1; + +import com.google.inject.Inject; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasTypeRegistry; + +public class HardDeleteHandlerV1 extends DeleteHandlerV1 { + + @Inject + public HardDeleteHandlerV1(AtlasTypeRegistry typeRegistry) { + super(typeRegistry, false, true); + } + + @Override + protected void _deleteVertex(AtlasVertex instanceVertex, boolean force) { + graphHelper.removeVertex(instanceVertex); + } + + @Override + protected void deleteEdge(AtlasEdge edge, boolean force) throws AtlasBaseException { + graphHelper.removeEdge(edge); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java new file mode 100644 index 0000000..488f141 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v1; + +import com.google.common.base.Optional; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; +import org.apache.atlas.repository.store.graph.EntityResolver; +import org.apache.atlas.typesystem.exception.EntityNotFoundException; +import org.apache.atlas.typesystem.persistence.Id; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class IDBasedEntityResolver implements EntityResolver { + + private Map<String, AtlasEntity> idToEntityMap = new HashMap<>(); + + private final GraphHelper graphHelper = GraphHelper.getInstance(); + + private EntityGraphDiscoveryContext context; + + @Override + public void init(EntityGraphDiscoveryContext context) throws AtlasBaseException { + this.context = context; + for (AtlasEntity entity : context.getRootEntities()) { + idToEntityMap.put(entity.getGuid(), entity); + } + } + + public EntityGraphDiscoveryContext resolveEntityReferences() throws AtlasBaseException { + + if ( context == null) { + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "Entity resolver not initialized"); + } + + List<AtlasObjectId> resolvedReferences = new ArrayList<>(); + + for (AtlasObjectId typeIdPair : context.getUnresolvedIdReferences()) { + if ( AtlasEntity.isAssigned(typeIdPair.getGuid())) { + //validate in graph repo that given guid, typename exists + Optional<AtlasVertex> vertex = resolveGuid(typeIdPair); + + if ( vertex.isPresent() ) { + context.addRepositoryResolvedReference(typeIdPair, vertex.get()); + resolvedReferences.add(typeIdPair); + } + } else { + //check if root references have this temporary id + if (!idToEntityMap.containsKey(typeIdPair.getGuid()) ) { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, "Could not find an entity with the specified id " + typeIdPair + " in the request"); + } + } + } + + context.removeUnResolvedIdReferences(resolvedReferences); + + //Resolve root references + for (AtlasEntity entity : context.getRootEntities()) { + if ( !context.isResolved(entity.getGuid()) && AtlasEntity.isAssigned(entity.getGuid())) { + AtlasObjectId typeIdPair = new AtlasObjectId(entity.getTypeName(), entity.getGuid()); + Optional<AtlasVertex> vertex = resolveGuid(typeIdPair); + if (vertex.isPresent()) { + context.addRepositoryResolvedReference(typeIdPair, vertex.get()); + context.removeUnResolvedIdReference(typeIdPair); + } + } + } + return context; + } + + private Optional<AtlasVertex> resolveGuid(AtlasObjectId typeIdPair) throws AtlasBaseException { + //validate in graph repo that given guid, typename exists + AtlasVertex vertex = null; + try { + vertex = graphHelper.findVertex(Constants.GUID_PROPERTY_KEY, typeIdPair.getGuid(), + Constants.TYPE_NAME_PROPERTY_KEY, typeIdPair.getTypeName(), + Constants.STATE_PROPERTY_KEY, Id.EntityState.ACTIVE.name()); + } catch (EntityNotFoundException e) { + //Ignore + } + if ( vertex != null ) { + return Optional.of(vertex); + } else { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, "Could not find an entity with the specified guid " + typeIdPair.getGuid() + " in Atlas respository"); + } + } + + @Override + public void cleanUp() throws AtlasBaseException { + idToEntityMap.clear(); + this.context = null; + } + +}
