Repository: incubator-atlas Updated Branches: refs/heads/master 511c88670 -> 2f1cb57a7
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InstanceGraphMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InstanceGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InstanceGraphMapper.java new file mode 100644 index 0000000..7e87d39 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InstanceGraphMapper.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v1; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasEntityType; + +public interface InstanceGraphMapper<T> { + + /** + * Map the given type instance to the graph + * + * @param ctx + * @return the value that was mapped to the vertex + * @throws AtlasBaseException + */ + T toGraph(GraphMutationContext ctx) throws AtlasBaseException; + + + void cleanUp() throws AtlasBaseException; + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/MapVertexMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/MapVertexMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/MapVertexMapper.java new file mode 100644 index 0000000..9d219f5 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/MapVertexMapper.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v1; + + +import com.google.common.base.Optional; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasException; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.TypeCategory; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasMapType; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasType; +import org.apache.commons.collections.MapUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Provider; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.atlas.repository.graph.GraphHelper.string; + +public class MapVertexMapper implements InstanceGraphMapper<Map> { + + private DeleteHandlerV1 deleteHandler; + + private static final Logger LOG = LoggerFactory.getLogger(MapVertexMapper.class); + + private StructVertexMapper structVertexMapper; + + @Inject + public MapVertexMapper(DeleteHandlerV1 deleteHandler) { + this.deleteHandler = deleteHandler; + } + + void init(StructVertexMapper structVertexMapper) { + this.structVertexMapper = structVertexMapper; + } + + @Override + public Map<String, Object> toGraph(GraphMutationContext ctx) throws AtlasBaseException { + + if (LOG.isDebugEnabled()) { + LOG.debug("Mapping instance to vertex {} for map type {}", string(ctx.getReferringVertex()), ctx.getAttrType().getTypeName()); + } + + @SuppressWarnings("unchecked") Map<Object, Object> newVal = + (Map<Object, Object>) ctx.getValue(); + + boolean newAttributeEmpty = MapUtils.isEmpty(newVal); + + Map<String, Object> currentMap = new HashMap<>(); + Map<String, Object> newMap = new HashMap<>(); + + AtlasMapType mapType = (AtlasMapType) ctx.getAttrType(); + + try { + List<String> currentKeys = GraphHelper.getListProperty(ctx.getReferringVertex(), ctx.getVertexPropertyKey()); + if (currentKeys != null && !currentKeys.isEmpty()) { + for (String key : currentKeys) { + String propertyNameForKey = GraphHelper.getQualifiedNameForMapKey(ctx.getVertexPropertyKey(), key); + Object propertyValueForKey = getMapValueProperty(mapType.getValueType(), ctx.getReferringVertex(), propertyNameForKey); + currentMap.put(key, propertyValueForKey); + } + } + + if (!newAttributeEmpty) { + for (Map.Entry<Object, Object> entry : newVal.entrySet()) { + String keyStr = entry.getKey().toString(); + String propertyNameForKey = GraphHelper.getQualifiedNameForMapKey(ctx.getVertexPropertyKey(), keyStr); + Optional<AtlasEdge> existingEdge = getEdgeIfExists(mapType, currentMap, keyStr); + + GraphMutationContext mapCtx = new GraphMutationContext.Builder(ctx.getAttribute(), mapType.getValueType(), entry.getValue()) + .referringVertex(ctx.getReferringVertex()) + .edge(existingEdge) + .vertexProperty(propertyNameForKey).build(); + + + Object newEntry = structVertexMapper.mapCollectionElementsToVertex(mapCtx); + newMap.put(keyStr, newEntry); + } + } + + Map<String, Object> finalMap = + removeUnusedMapEntries(ctx.getParentType(), mapType, ctx.getAttributeDef(), ctx.getReferringVertex(), ctx.getVertexPropertyKey(), currentMap, newMap); + + Set<String> newKeys = new HashSet<>(newMap.keySet()); + newKeys.addAll(finalMap.keySet()); + + // for dereference on way out + GraphHelper.setListProperty(ctx.getReferringVertex(), ctx.getVertexPropertyKey(), new ArrayList<>(newKeys)); + } catch (AtlasException e) { + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Map values set in vertex {} {}", mapType.getTypeName(), newMap); + } + + return newMap; + } + + @Override + public void cleanUp() throws AtlasBaseException { + } + + + public static Object getMapValueProperty(AtlasType elementType, AtlasVertex instanceVertex, String propertyName) { + String actualPropertyName = GraphHelper.encodePropertyKey(propertyName); + if (AtlasGraphUtilsV1.isReference(elementType)) { + return instanceVertex.getProperty(actualPropertyName, AtlasEdge.class); + } + else { + return instanceVertex.getProperty(actualPropertyName, String.class).toString(); + } + } + + public static void setMapValueProperty(AtlasType elementType, AtlasVertex instanceVertex, String propertyName, Object value) { + String actualPropertyName = GraphHelper.encodePropertyKey(propertyName); + if (AtlasGraphUtilsV1.isReference(elementType)) { + instanceVertex.setPropertyFromElementId(actualPropertyName, (AtlasEdge)value); + } + else { + instanceVertex.setProperty(actualPropertyName, value); + } + } + + //Remove unused entries from map + private Map<String, Object> removeUnusedMapEntries( + AtlasStructType entityType, + AtlasMapType mapType, AtlasStructDef.AtlasAttributeDef attributeDef, + AtlasVertex instanceVertex, String propertyName, + Map<String, Object> currentMap, + Map<String, Object> newMap) + throws AtlasException, AtlasBaseException { + + Map<String, Object> additionalMap = new HashMap<>(); + for (String currentKey : currentMap.keySet()) { + + boolean shouldDeleteKey = !newMap.containsKey(currentKey); + if (AtlasGraphUtilsV1.isReference(mapType.getValueType())) { + + //Delete the edge reference if its not part of new edges created/updated + AtlasEdge currentEdge = (AtlasEdge)currentMap.get(currentKey); + + if (!newMap.values().contains(currentEdge)) { + boolean deleteChildReferences = StructVertexMapper.shouldManageChildReferences(entityType, attributeDef.getName()); + boolean deleted = + deleteHandler.deleteEdgeReference(currentEdge, mapType.getValueType().getTypeCategory(), deleteChildReferences, true); + if (!deleted) { + additionalMap.put(currentKey, currentEdge); + shouldDeleteKey = false; + } + } + } + + if (shouldDeleteKey) { + String propertyNameForKey = GraphHelper.getQualifiedNameForMapKey(propertyName, currentKey); + GraphHelper.setProperty(instanceVertex, propertyNameForKey, null); + } + } + return additionalMap; + } + + private Optional<AtlasEdge> getEdgeIfExists(AtlasMapType mapType, Map<String, Object> currentMap, String keyStr) { + Optional<AtlasEdge> existingEdge = Optional.absent(); + if ( AtlasGraphUtilsV1.isReference(mapType.getValueType()) ) { + existingEdge = Optional.of((AtlasEdge) currentMap.get(keyStr)); + } + + return existingEdge; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java new file mode 100644 index 0000000..7e3068b --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.store.graph.v1; + +import com.google.inject.Inject; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.RequestContext; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.typesystem.persistence.Id; + +import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY; +import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; + +public class SoftDeleteHandlerV1 extends DeleteHandlerV1 { + + @Inject + public SoftDeleteHandlerV1(AtlasTypeRegistry typeRegistry) { + super(typeRegistry, false, true); + } + + @Override + protected void _deleteVertex(AtlasVertex instanceVertex, boolean force) { + if (force) { + graphHelper.removeVertex(instanceVertex); + } else { + AtlasEntity.Status state = AtlasGraphUtilsV1.getState(instanceVertex); + if (state != AtlasEntity.Status.DELETED) { + GraphHelper.setProperty(instanceVertex, STATE_PROPERTY_KEY, Id.EntityState.DELETED.name()); + GraphHelper.setProperty(instanceVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, + RequestContext.get().getRequestTime()); + GraphHelper.setProperty(instanceVertex, MODIFIED_BY_KEY, RequestContext.get().getUser()); + } + } + } + + @Override + protected void deleteEdge(AtlasEdge edge, boolean force) throws AtlasBaseException { + if (force) { + graphHelper.removeEdge(edge); + } else { + Id.EntityState state = GraphHelper.getState(edge); + if (state != Id.EntityState.DELETED) { + GraphHelper.setProperty(edge, STATE_PROPERTY_KEY, Id.EntityState.DELETED.name()); + GraphHelper + .setProperty(edge, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime()); + GraphHelper.setProperty(edge, MODIFIED_BY_KEY, RequestContext.get().getUser()); + } + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/StructVertexMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/StructVertexMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/StructVertexMapper.java new file mode 100644 index 0000000..ae9ecc4 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/StructVertexMapper.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v1; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContextV1; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.TypeCategory; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.RepositoryException; +import org.apache.atlas.repository.graph.AtlasGraphProvider; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasMapType; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StructVertexMapper implements InstanceGraphMapper<AtlasEdge> { + + private final AtlasGraph graph; + + private final GraphHelper graphHelper = GraphHelper.getInstance(); + + private final MapVertexMapper mapVertexMapper; + + private final ArrayVertexMapper arrVertexMapper; + + private EntityGraphMapper entityVertexMapper; + + private static final Logger LOG = LoggerFactory.getLogger(StructVertexMapper.class); + + public StructVertexMapper(ArrayVertexMapper arrayVertexMapper, MapVertexMapper mapVertexMapper) { + this.graph = AtlasGraphProvider.getGraphInstance();; + this.mapVertexMapper = mapVertexMapper; + this.arrVertexMapper = arrayVertexMapper; + } + + void init(final EntityGraphMapper entityVertexMapper) { + this.entityVertexMapper = entityVertexMapper; + } + + @Override + public AtlasEdge toGraph(GraphMutationContext ctx) throws AtlasBaseException { + AtlasEdge result = null; + + String edgeLabel = AtlasGraphUtilsV1.getAttributeEdgeLabel(ctx.getParentType(), ctx.getAttributeDef().getName()); + + if ( ctx.getCurrentEdge().isPresent() ) { + updateVertex(ctx.getParentType(), (AtlasStructType) ctx.getAttrType(), ctx.getAttributeDef(), (AtlasStruct) ctx.getValue(), ctx.getCurrentEdge().get().getOutVertex()); + result = ctx.getCurrentEdge().get(); + } else { + result = createVertex(ctx.getParentType(), (AtlasStructType) ctx.getAttrType(), ctx.getAttributeDef(), (AtlasStruct) ctx.getValue(), ctx.getReferringVertex(), edgeLabel); + } + + return result; + } + + @Override + public void cleanUp() throws AtlasBaseException { + } + + public static boolean shouldManageChildReferences(AtlasStructType type, String attributeName) { + return type.isMappedFromRefAttribute(attributeName); + } + + /** + * Map attributes for entity, struct or trait + * @param structType + * @param struct + * @param vertex + * @return + * @throws AtlasBaseException + */ + public AtlasVertex mapAttributestoVertex(AtlasStructType structType, AtlasStruct struct, AtlasVertex vertex) throws AtlasBaseException { + if (struct.getAttributes() != null) { + for (String attrName : struct.getAttributes().keySet()) { + Object value = struct.getAttribute(attrName); + AtlasType attributeType = structType.getAttributeType(attrName); + if ( attributeType != null) { + final AtlasStructType.AtlasAttribute attribute = structType.getAttribute(attrName); + + GraphMutationContext ctx = new GraphMutationContext.Builder(attribute, value) + .referringVertex(vertex) + .vertexProperty(AtlasGraphUtilsV1.getQualifiedAttributePropertyKey(structType, attrName)).build(); + mapToVertexByTypeCategory(ctx); + } + } + + //Set updated timestamp + AtlasGraphUtilsV1.setProperty(vertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime()); + GraphHelper.setProperty(vertex, Constants.MODIFIED_BY_KEY, RequestContextV1.get().getUser()); + } + return vertex; + } + + protected Object mapToVertexByTypeCategory(GraphMutationContext ctx) throws AtlasBaseException { + switch(ctx.getAttrType().getTypeCategory()) { + case PRIMITIVE: + case ENUM: + return primitivesToVertex(ctx); + case STRUCT: + return toGraph(ctx); + case ENTITY: + AtlasEntityType instanceType = entityVertexMapper.getInstanceType(ctx.getValue()); + ctx.setElementType(instanceType); + return entityVertexMapper.toGraph(ctx); + case MAP: + return mapVertexMapper.toGraph(ctx); + case ARRAY: + return arrVertexMapper.toGraph(ctx); + default: + throw new AtlasBaseException(AtlasErrorCode.TYPE_CATEGORY_INVALID, ctx.getAttrType().getTypeCategory().name()); + } + } + + protected Object primitivesToVertex(GraphMutationContext ctx) { + if ( ctx.getAttrType().getTypeCategory() == TypeCategory.MAP ) { + MapVertexMapper.setMapValueProperty(((AtlasMapType) ctx.getAttrType()).getValueType(), ctx.getReferringVertex(), ctx.getVertexPropertyKey(), ctx.getValue()); + } else { + AtlasGraphUtilsV1.setProperty(ctx.getReferringVertex(), ctx.getVertexPropertyKey(), ctx.getValue()); + } + return ctx.getValue(); + } + + private AtlasEdge createVertex(AtlasStructType parentType, AtlasStructType attrType, AtlasStructDef.AtlasAttributeDef attributeDef, AtlasStruct struct, AtlasVertex referringVertex, String edgeLabel) throws AtlasBaseException { + AtlasVertex vertex = createVertexTemplate(struct, attrType); + mapAttributestoVertex(attrType, struct, vertex); + + try { + //TODO - Map directly in AtlasGraphUtilsV1 + return graphHelper.getOrCreateEdge(referringVertex, vertex, edgeLabel); + } catch (RepositoryException e) { + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e); + } + } + + private void updateVertex(AtlasStructType parentType, AtlasStructType structAttributeType, AtlasStructDef.AtlasAttributeDef attributeDef, AtlasStruct value, AtlasVertex structVertex) throws AtlasBaseException { + mapAttributestoVertex(structAttributeType, value, structVertex); + } + + protected AtlasVertex createVertexTemplate(final AtlasStruct instance, final AtlasStructType structType) { + LOG.debug("Creating AtlasVertex for type {}", instance.getTypeName()); + final AtlasVertex vertexWithoutIdentity = graph.addVertex(); + + // add type information + AtlasGraphUtilsV1.setProperty(vertexWithoutIdentity, Constants.ENTITY_TYPE_PROPERTY_KEY, instance.getTypeName()); + + // add state information + AtlasGraphUtilsV1.setProperty(vertexWithoutIdentity, Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name()); + + // add timestamp information + AtlasGraphUtilsV1.setProperty(vertexWithoutIdentity, Constants.TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime()); + AtlasGraphUtilsV1.setProperty(vertexWithoutIdentity, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, + RequestContextV1.get().getRequestTime()); + + AtlasGraphUtilsV1.setProperty(vertexWithoutIdentity, Constants.CREATED_BY_KEY, RequestContextV1.get().getUser()); + + GraphHelper.setProperty(vertexWithoutIdentity, Constants.MODIFIED_BY_KEY, RequestContextV1.get().getUser()); + + return vertexWithoutIdentity; + } + + protected Object mapCollectionElementsToVertex(GraphMutationContext ctx) throws AtlasBaseException { + switch(ctx.getAttrType().getTypeCategory()) { + case PRIMITIVE: + case ENUM: + return primitivesToVertex(ctx); + case STRUCT: + return toGraph(ctx); + case ENTITY: + AtlasEntityType instanceType = entityVertexMapper.getInstanceType(ctx.getValue()); + ctx.setElementType(instanceType); + return entityVertexMapper.toGraph(ctx); + case MAP: + case ARRAY: + default: + throw new AtlasBaseException(AtlasErrorCode.TYPE_CATEGORY_INVALID, ctx.getAttrType().getTypeCategory().name()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/UniqAttrBasedEntityResolver.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/UniqAttrBasedEntityResolver.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/UniqAttrBasedEntityResolver.java new file mode 100644 index 0000000..8025f1e --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/UniqAttrBasedEntityResolver.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v1; + +import com.google.common.base.Optional; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; +import org.apache.atlas.repository.store.graph.EntityResolver; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.typesystem.exception.EntityNotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.List; + +public class UniqAttrBasedEntityResolver implements EntityResolver { + + private static final Logger LOG = LoggerFactory.getLogger(UniqAttrBasedEntityResolver.class); + + private final AtlasTypeRegistry typeRegistry; + + private final GraphHelper graphHelper = GraphHelper.getInstance(); + + private EntityGraphDiscoveryContext context; + + @Inject + public UniqAttrBasedEntityResolver(AtlasTypeRegistry typeRegistry) { + this.typeRegistry = typeRegistry; + } + + @Override + public void init(EntityGraphDiscoveryContext entities) throws AtlasBaseException { + this.context = entities; + } + + @Override + public EntityGraphDiscoveryContext resolveEntityReferences() throws AtlasBaseException { + + if ( context == null) { + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "Unique attribute based entity resolver not initialized"); + } + + //Resolve attribute references + List<AtlasEntity> resolvedReferences = new ArrayList<>(); + + for (AtlasEntity entity : context.getUnResolvedEntityReferences()) { + //query in graph repo that given unique attribute - check for deleted also? + Optional<AtlasVertex> vertex = resolveByUniqueAttribute(entity); + if (vertex.isPresent()) { + context.addRepositoryResolvedReference(new AtlasObjectId(entity.getTypeName(), entity.getGuid()), vertex.get()); + resolvedReferences.add(entity); + } + } + + context.removeUnResolvedEntityReferences(resolvedReferences); + + if (context.getUnResolvedEntityReferences().size() > 0) { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, context.getUnResolvedEntityReferences().toString()); + } + + //Resolve root references + for (AtlasEntity entity : context.getRootEntities()) { + if ( !context.isResolved(entity.getGuid()) ) { + Optional<AtlasVertex> vertex = resolveByUniqueAttribute(entity); + if (vertex.isPresent()) { + context.addRepositoryResolvedReference(new AtlasObjectId(entity.getTypeName(), entity.getGuid()), vertex.get()); + } + } + } + + return context; + } + + Optional<AtlasVertex> resolveByUniqueAttribute(AtlasEntity entity) throws AtlasBaseException { + AtlasEntityType entityType = (AtlasEntityType) typeRegistry.getType(entity.getTypeName()); + for (AtlasStructType.AtlasAttribute attr : entityType.getAllAttributes().values()) { + if (attr.getAttributeDef().getIsUnique()) { + Object attrVal = entity.getAttribute(attr.getAttributeDef().getName()); + if (attrVal != null) { + String qualifiedAttrName = attr.getQualifiedAttributeName(); + AtlasVertex vertex = null; + try { + vertex = graphHelper.findVertex(qualifiedAttrName, attrVal, + Constants.ENTITY_TYPE_PROPERTY_KEY, entityType.getTypeName(), + Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE + .name()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Found vertex by unique attribute : " + qualifiedAttrName + "=" + attrVal); + } + if (vertex != null) { + return Optional.of(vertex); + } + } catch (EntityNotFoundException e) { + //Ignore if not found + } + } + } + } + return Optional.absent(); + } + + @Override + public void cleanUp() { + //Nothing to cleanup + this.context = null; + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java b/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java index ea0e670..6655085 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java @@ -27,6 +27,8 @@ import org.apache.atlas.repository.audit.HBaseBasedAuditRepository; import org.apache.atlas.repository.graph.DeleteHandler; import org.apache.atlas.repository.graph.SoftDeleteHandler; import org.apache.atlas.repository.graphdb.GraphDatabase; +import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1; +import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1; import org.apache.atlas.typesystem.types.cache.DefaultTypeCache; import org.apache.atlas.typesystem.types.cache.TypeCache; import org.apache.commons.configuration.Configuration; @@ -83,6 +85,16 @@ public class AtlasRepositoryConfiguration { throw new RuntimeException(e); } } + + public static Class<? extends DeleteHandlerV1> getDeleteHandlerV1Impl() { + try { + Configuration config = ApplicationProperties.get(); + return ApplicationProperties.getClass(config, + DELETE_HANDLER_IMPLEMENTATION_PROPERTY, SoftDeleteHandlerV1.class.getName(), DeleteHandlerV1.class); + } catch (AtlasException e) { + throw new RuntimeException(e); + } + } private static final String GRAPH_DATABASE_IMPLEMENTATION_PROPERTY = "atlas.graphdb.backend"; private static final String DEFAULT_GRAPH_DATABASE_IMPLEMENTATION_CLASS = "org.apache.atlas.repository.graphdb.titan0.Titan0GraphDatabase"; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/repository/src/test/java/org/apache/atlas/lineage/EntityLineageServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/lineage/EntityLineageServiceTest.java b/repository/src/test/java/org/apache/atlas/lineage/EntityLineageServiceTest.java index b1dac9d..19124d7 100644 --- a/repository/src/test/java/org/apache/atlas/lineage/EntityLineageServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/lineage/EntityLineageServiceTest.java @@ -258,7 +258,7 @@ public class EntityLineageServiceTest extends BaseRepositoryTest { assertEquals(relationsInput.size(), 2); AtlasEntityHeader tableEntityInput = entitiesInput.get(entityGuid); - assertEquals(tableEntityInput.getStatus(), Status.STATUS_ACTIVE); + assertEquals(tableEntityInput.getStatus(), Status.ACTIVE); AtlasLineageInfo outputLineage = getOutputLineageInfo(entityGuid, 5); assertNotNull(outputLineage); @@ -273,7 +273,7 @@ public class EntityLineageServiceTest extends BaseRepositoryTest { assertEquals(relationsOutput.size(), 2); AtlasEntityHeader tableEntityOutput = entitiesOutput.get(entityGuid); - assertEquals(tableEntityOutput.getStatus(), Status.STATUS_ACTIVE); + assertEquals(tableEntityOutput.getStatus(), Status.ACTIVE); AtlasLineageInfo bothLineage = getBothLineageInfo(entityGuid, 5); assertNotNull(bothLineage); @@ -288,7 +288,7 @@ public class EntityLineageServiceTest extends BaseRepositoryTest { assertEquals(relationsBoth.size(), 4); AtlasEntityHeader tableEntityBoth = entitiesBoth.get(entityGuid); - assertEquals(tableEntityBoth.getStatus(), Status.STATUS_ACTIVE); + assertEquals(tableEntityBoth.getStatus(), Status.ACTIVE); //Delete the table entity. Lineage for entity returns the same results as before. //Lineage for table name throws EntityNotFoundException @@ -297,17 +297,17 @@ public class EntityLineageServiceTest extends BaseRepositoryTest { inputLineage = getInputLineageInfo(entityGuid, 5); tableEntityInput = inputLineage.getGuidEntityMap().get(entityGuid); - assertEquals(tableEntityInput.getStatus(), Status.STATUS_DELETED); + assertEquals(tableEntityInput.getStatus(), Status.DELETED); assertEquals(inputLineage.getGuidEntityMap().size(), 3); outputLineage = getOutputLineageInfo(entityGuid, 5); tableEntityOutput = outputLineage.getGuidEntityMap().get(entityGuid); - assertEquals(tableEntityOutput.getStatus(), Status.STATUS_DELETED); + assertEquals(tableEntityOutput.getStatus(), Status.DELETED); assertEquals(outputLineage.getGuidEntityMap().size(), 3); bothLineage = getBothLineageInfo(entityGuid, 5); tableEntityBoth = bothLineage.getGuidEntityMap().get(entityGuid); - assertEquals(tableEntityBoth.getStatus(), Status.STATUS_DELETED); + assertEquals(tableEntityBoth.getStatus(), Status.DELETED); assertEquals(bothLineage.getGuidEntityMap().size(), 5); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/repository/src/test/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStoreTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStoreTest.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStoreTest.java index c7c3286..25bae44 100644 --- a/repository/src/test/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStoreTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStoreTest.java @@ -32,6 +32,7 @@ import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.store.AtlasTypeDefStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Guice; @@ -215,7 +216,7 @@ public class AtlasTypeDefGraphStoreTest { assertTrue(createdTypesDef.getEnumDefs().containsAll(atlasTypesDef.getEnumDefs()), "EnumDefs create failed"); assertTrue(createdTypesDef.getClassificationDefs().containsAll(atlasTypesDef.getClassificationDefs()), "ClassificationDef create failed"); assertTrue(createdTypesDef.getStructDefs().containsAll(atlasTypesDef.getStructDefs()), "StructDef creation failed"); - assertTrue(createdTypesDef.getEntityDefs().containsAll(atlasTypesDef.getEntityDefs()), "EntityDef creation failed"); + Assert.assertEquals(createdTypesDef.getEntityDefs(), atlasTypesDef.getEntityDefs()); } catch (AtlasBaseException e) { fail("Creation of Types should've been a success", e); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java new file mode 100644 index 0000000..0ff33ba --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v1; + +import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasException; +import org.apache.atlas.RepositoryMetadataModule; +import org.apache.atlas.TestUtils; +import org.apache.atlas.TestUtilsV2; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.instance.EntityMutations; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.repository.graph.AtlasGraphProvider; +import org.apache.atlas.repository.graph.DeleteHandler; +import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.repository.store.graph.EntityGraphDiscovery; +import org.apache.atlas.repository.store.graph.EntityResolver; +import org.apache.atlas.services.MetadataService; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasArrayType; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasMapType; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.typesystem.IInstance; +import org.apache.atlas.typesystem.ITypedReferenceableInstance; +import org.apache.atlas.typesystem.ITypedStruct; +import org.apache.atlas.typesystem.Struct; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.typesystem.persistence.ReferenceableInstance; +import org.apache.atlas.typesystem.persistence.StructInstance; +import org.apache.atlas.util.AtlasRepositoryConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import javax.inject.Inject; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@Guice(modules = RepositoryMetadataModule.class) +public class AtlasEntityStoreV1Test { + + private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1Test.class); + + @Inject + AtlasTypeRegistry typeRegistry; + + @Inject + AtlasTypeDefStore typeDefStore; + + AtlasEntityStore entityStore; + + @Inject + MetadataService metadataService; + + private AtlasEntity entityCreated; + + @BeforeClass + public void setUp() throws Exception { + new GraphBackedSearchIndexer(typeRegistry); + final AtlasTypesDef atlasTypesDef = TestUtilsV2.defineDeptEmployeeTypes(); + typeDefStore.createTypesDef(atlasTypesDef); + + entityCreated = TestUtilsV2.createDeptEg1(); + } + + @AfterClass + public void clear() { + AtlasGraphProvider.cleanup(); + } + + @BeforeTest + public void init() throws Exception { + final Class<? extends DeleteHandlerV1> deleteHandlerImpl = AtlasRepositoryConfiguration.getDeleteHandlerV1Impl(); + final Constructor<? extends DeleteHandlerV1> deleteHandlerImplConstructor = deleteHandlerImpl.getConstructor(AtlasTypeRegistry.class); + DeleteHandlerV1 deleteHandler = deleteHandlerImplConstructor.newInstance(typeRegistry); + ArrayVertexMapper arrVertexMapper = new ArrayVertexMapper(deleteHandler); + MapVertexMapper mapVertexMapper = new MapVertexMapper(deleteHandler); + + List<EntityResolver> entityResolvers = new ArrayList<>(); + entityResolvers.add(new IDBasedEntityResolver()); + entityResolvers.add(new UniqAttrBasedEntityResolver(typeRegistry)); + + EntityGraphDiscovery graphDiscovery = new AtlasEntityGraphDiscoveryV1(typeRegistry, entityResolvers); + + entityStore = new AtlasEntityStoreV1(new EntityGraphMapper(arrVertexMapper, mapVertexMapper)); + entityStore.init(typeRegistry, graphDiscovery); + } + + @Test + public void testCreate() throws Exception { + EntityMutationResponse response = entityStore.createOrUpdate(entityCreated); + List<AtlasEntityHeader> entitiesCreated = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE); + Assert.assertNotNull(entitiesCreated); + Assert.assertEquals(entitiesCreated.size(), 5); + + AtlasEntityHeader deptEntity = entitiesCreated.get(0); + + //TODO : Use the older API for get until new instance API is ready. + ITypedReferenceableInstance instance = metadataService.getEntityDefinition(deptEntity.getGuid()); + assertAttributes(deptEntity, instance); + + } + + private void assertAttributes(AtlasStruct entity, IInstance instance) throws AtlasBaseException, AtlasException { + LOG.debug("Asserting type : " + entity.getTypeName()); + AtlasStructType entityType = (AtlasStructType) typeRegistry.getType(instance.getTypeName()); + for (String attrName : entity.getAttributes().keySet()) { + Object actual = entity.getAttribute(attrName); + Object expected = instance.get(attrName); + + AtlasType attrType = entityType.getAttributeType(attrName); + assertAttribute(actual, expected, attrType, attrName); + } + } + + private void assertAttribute(Object actual, Object expected, AtlasType attributeType, String attrName) throws AtlasBaseException, AtlasException { + LOG.debug("Asserting attribute : " + attrName); + + switch(attributeType.getTypeCategory()) { + case ENTITY: + if ( expected instanceof Id) { + String guid = ((Id) expected)._getId(); + Assert.assertTrue(AtlasEntity.isAssigned(guid)); + } else { + ReferenceableInstance expectedInstance = (ReferenceableInstance) expected; + AtlasEntity actualInstance = (AtlasEntity) actual; + assertAttributes(actualInstance, expectedInstance); + } + break; + case PRIMITIVE: + case ENUM: + Assert.assertEquals(actual, expected); + break; + case MAP: + AtlasMapType mapType = (AtlasMapType) attributeType; + AtlasType keyType = mapType.getKeyType(); + AtlasType valueType = mapType.getValueType(); + Map actualMap = (Map) actual; + Map expectedMap = (Map) expected; + + Assert.assertEquals(actualMap.size(), expectedMap.size()); + for (Object key : actualMap.keySet()) { + assertAttribute(actualMap.get(key), expectedMap.get(key), valueType, attrName); + } + break; + case ARRAY: + AtlasArrayType arrType = (AtlasArrayType) attributeType; + AtlasType elemType = arrType.getElementType(); + List actualList = (List) actual; + List expectedList = (List) expected; + + if (!(expected == null && actualList.size() == 0)) { + Assert.assertEquals(actualList.size(), expectedList.size()); + for (int i = 0; i < actualList.size(); i++) { + assertAttribute(actualList.get(i), expectedList.get(i), elemType, attrName); + } + } + break; + case STRUCT: + StructInstance structInstance = (StructInstance) expected; + AtlasStruct newStructVal = (AtlasStruct) actual; + assertAttributes(newStructVal, structInstance); + break; + default: + Assert.fail("Unknown type category"); + } + } + + @Test(dependsOnMethods = "testCreate") + public void testArrayUpdate() throws Exception { + //clear state + init(); + + AtlasEntity entityClone = new AtlasEntity(entityCreated); + + List<AtlasEntity> employees = (List<AtlasEntity>) entityClone.getAttribute("employees"); + + List<AtlasEntity> updatedEmployees = new ArrayList<>(employees); + clearSubOrdinates(updatedEmployees, 1); + entityClone.setAttribute("employees", updatedEmployees); + + EntityMutationResponse response = entityStore.createOrUpdate(entityClone); + + List<AtlasEntityHeader> entitiesUpdated = response.getEntitiesByOperation(EntityMutations.EntityOperation.UPDATE); + Assert.assertNotNull(entitiesUpdated); + Assert.assertEquals(entitiesUpdated.size(), 5); + + AtlasEntityHeader deptEntity = entitiesUpdated.get(0); + + //TODO : Change to new API after new instance GET API is ready. + ITypedReferenceableInstance instance = metadataService.getEntityDefinition(deptEntity.getGuid()); + assertAttributes(deptEntity, instance); + + } + + private void clearSubOrdinates(List<AtlasEntity> updatedEmployees, int index) { + List<AtlasEntity> subOrdinates = (List<AtlasEntity>) updatedEmployees.get(index).getAttribute("subordinates"); + List<AtlasEntity> subOrdClone = new ArrayList<>(subOrdinates); + subOrdClone.remove(index); + + updatedEmployees.get(index).setAttribute("subordinates", subOrdClone); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/server-api/src/main/java/org/apache/atlas/RequestContextV1.java ---------------------------------------------------------------------- diff --git a/server-api/src/main/java/org/apache/atlas/RequestContextV1.java b/server-api/src/main/java/org/apache/atlas/RequestContextV1.java new file mode 100644 index 0000000..1fc11a2 --- /dev/null +++ b/server-api/src/main/java/org/apache/atlas/RequestContextV1.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas; + +import org.apache.atlas.metrics.Metrics; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.typesystem.ITypedReferenceableInstance; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.typesystem.types.ClassType; +import org.apache.atlas.typesystem.types.TypeSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +public class RequestContextV1 { + private static final Logger LOG = LoggerFactory.getLogger(RequestContextV1.class); + + private static final ThreadLocal<RequestContextV1> CURRENT_CONTEXT = new ThreadLocal<>(); + + private Set<String> createdEntityIds = new LinkedHashSet<>(); + private Set<String> updatedEntityIds = new LinkedHashSet<>(); + private Set<String> deletedEntityIds = new LinkedHashSet<>(); + + private String user; + private final long requestTime; + + TypeSystem typeSystem = TypeSystem.getInstance(); + private Metrics metrics = new Metrics(); + + private RequestContextV1() { + requestTime = System.currentTimeMillis(); + } + + //To handle gets from background threads where createContext() is not called + //createContext called for every request in the filter + public static RequestContextV1 get() { + RequestContextV1 ret = CURRENT_CONTEXT.get(); + + if (ret == null) { + ret = new RequestContextV1(); + CURRENT_CONTEXT.set(ret); + } + + return ret; + } + public static void clear() { + CURRENT_CONTEXT.remove(); + } + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + public void recordEntityCreate(Collection<String> createdEntityIds) { + this.createdEntityIds.addAll(createdEntityIds); + } + + public void recordEntityCreate(String createdEntityId) { + this.createdEntityIds.add(createdEntityId); + } + + public void recordEntityUpdate(Collection<String> updatedEntityIds) { + this.updatedEntityIds.addAll(updatedEntityIds); + } + + public void recordEntityUpdate(String entityId) { + this.updatedEntityIds.add(entityId); + } + public void recordEntityDelete(String entityId) { + deletedEntityIds.add(entityId); + } + + public Collection<String> getCreatedEntityIds() { + return createdEntityIds; + } + + public Collection<String> getUpdatedEntityIds() { + return updatedEntityIds; + } + + public Collection<String> getDeletedEntityIds() { + return deletedEntityIds; + } + + public long getRequestTime() { + return requestTime; + } + + public boolean isDeletedEntity(String entityGuid) { + return deletedEntityIds.contains(entityGuid); + } + + public static Metrics getMetrics() { + return get().metrics; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasEntityFormatConverter.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasEntityFormatConverter.java b/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasEntityFormatConverter.java index c4be236..74ab740 100644 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasEntityFormatConverter.java +++ b/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasEntityFormatConverter.java @@ -109,9 +109,9 @@ public class AtlasEntityFormatConverter extends AtlasStructFormatConverter { } private AtlasEntity.Status convertState(EntityState state){ - Status status = Status.STATUS_ACTIVE; + Status status = Status.ACTIVE; if(state != null && state.equals(EntityState.DELETED)){ - status = Status.STATUS_DELETED; + status = Status.DELETED; } LOG.debug("Setting state to {}", state); return status; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasInstanceRestAdapters.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasInstanceRestAdapters.java b/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasInstanceRestAdapters.java index ad16be7..2b13552 100644 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasInstanceRestAdapters.java +++ b/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasInstanceRestAdapters.java @@ -126,13 +126,13 @@ public class AtlasInstanceRestAdapters { for (String guid : result.getCreatedEntities()) { AtlasEntityHeader header = new AtlasEntityHeader(); header.setGuid(guid); - response.addEntity(EntityMutations.EntityOperation.CREATE_OR_UPDATE, header); + response.addEntity(EntityMutations.EntityOperation.CREATE, header); } for (String guid : result.getUpdateEntities()) { AtlasEntityHeader header = new AtlasEntityHeader(); header.setGuid(guid); - response.addEntity(EntityMutations.EntityOperation.CREATE_OR_UPDATE, header); + response.addEntity(EntityMutations.EntityOperation.UPDATE, header); } for (String guid : result.getDeletedEntities()) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasStructFormatConverter.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasStructFormatConverter.java b/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasStructFormatConverter.java index 3565ab3..8777510 100644 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasStructFormatConverter.java +++ b/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasStructFormatConverter.java @@ -124,20 +124,20 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter { if (MapUtils.isNotEmpty(attributes)) { ret = new HashMap<>(); - for (AtlasStructDef.AtlasAttributeDef attrDef : getAttributeDefs(structType)) { - AtlasType attrType = structType.getAttributeType(attrDef.getName()); + for (AtlasStructType.AtlasAttribute attr : getAttributes(structType)) { + AtlasType attrType = structType.getAttributeType(attr.getAttributeDef().getName()); if (attrType == null) { - LOG.warn("ignored attribute {}.{}: failed to find AtlasType", structType.getTypeName(), attrDef.getName()); + LOG.warn("ignored attribute {}.{}: failed to find AtlasType", structType.getTypeName(), attr.getAttributeDef().getName()); continue; } AtlasFormatConverter attrConverter = converterRegistry.getConverter(attrType.getTypeCategory()); - Object v2Value = attributes.get(attrDef.getName()); + Object v2Value = attributes.get(attr.getAttributeDef().getName()); Object v1Value = attrConverter.fromV2ToV1(v2Value, attrType); - ret.put(attrDef.getName(), v1Value); + ret.put(attr.getAttributeDef().getName(), v1Value); } } @@ -150,29 +150,27 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter { if (MapUtils.isNotEmpty(attributes)) { ret = new HashMap<>(); - for (AtlasStructDef.AtlasAttributeDef attrDef : getAttributeDefs(structType)) { - AtlasType attrType = structType.getAttributeType(attrDef.getName()); + for (AtlasStructType.AtlasAttribute attr : getAttributes(structType)) { + AtlasType attrType = structType.getAttributeType(attr.getAttributeDef().getName()); AtlasFormatConverter attrConverter = converterRegistry.getConverter(attrType.getTypeCategory()); - Object v1Value = attributes.get(attrDef.getName()); + Object v1Value = attributes.get(attr.getAttributeDef().getName()); Object v2Value = attrConverter.fromV1ToV2(v1Value, attrType); - ret.put(attrDef.getName(), v2Value); + ret.put(attr.getAttributeDef().getName(), v2Value); } } return ret; } - private Collection<AtlasAttributeDef> getAttributeDefs(AtlasStructType structType) { - Collection<AtlasAttributeDef> ret = null; + private Collection<AtlasStructType.AtlasAttribute> getAttributes(AtlasStructType structType) { + Collection<AtlasStructType.AtlasAttribute> ret = null; - if (structType.getTypeCategory() == TypeCategory.STRUCT) { - ret = structType.getStructDef().getAttributeDefs(); - } else if (structType.getTypeCategory() == TypeCategory.CLASSIFICATION) { - ret = ((AtlasClassificationType)structType).getAllAttributeDefs().values(); - } else if (structType.getTypeCategory() == TypeCategory.ENTITY) { - ret = ((AtlasEntityType)structType).getAllAttributeDefs().values(); + if (structType.getTypeCategory() == TypeCategory.STRUCT + || structType.getTypeCategory() == TypeCategory.CLASSIFICATION + || structType.getTypeCategory() == TypeCategory.ENTITY) { + ret = structType.getAllAttributes().values(); } else { ret = Collections.emptyList(); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/webapp/src/main/java/org/apache/atlas/web/rest/EntitiesREST.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/EntitiesREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/EntitiesREST.java index f6acd07..21f8977 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/EntitiesREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/EntitiesREST.java @@ -104,7 +104,7 @@ public class EntitiesREST { /******* * Entity Updation - Allows full update of the specified entities. * Any associations like Classifications, Business Terms will have to be handled through the respective APIs - * Null updates are supported i.e Set an attribute value to Null if its an optional attribute + * Null updates are supported i.e Set an attribute value to Null if its an optional attribute *******/ @PUT @Consumes(Servlets.JSON_MEDIA_TYPE) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java b/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java index 54ca236..58ddf8b 100644 --- a/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java +++ b/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java @@ -69,7 +69,7 @@ public final class LineageUtils { TypeSystem.IdType idType = TypeSystem.getInstance().getIdType(); vertexIdMap.put(idType.idAttrName(), guid); - vertexIdMap.put(idType.stateAttrName(), (entityHeader.getStatus() == AtlasEntity.Status.STATUS_ACTIVE) ? "ACTIVE" : "DELETED"); + vertexIdMap.put(idType.stateAttrName(), (entityHeader.getStatus() == AtlasEntity.Status.ACTIVE) ? "ACTIVE" : "DELETED"); vertexIdMap.put(idType.typeNameAttrName(), entityHeader.getTypeName()); Map<String, Object> values = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntitiesREST.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntitiesREST.java b/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntitiesREST.java index 265b650..90a46f8 100644 --- a/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntitiesREST.java +++ b/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntitiesREST.java @@ -96,7 +96,7 @@ public class TestEntitiesREST { entities.add(tableEntity); EntityMutationResponse response = entitiesREST.createOrUpdate(entities); - List<AtlasEntityHeader> guids = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE_OR_UPDATE); + List<AtlasEntityHeader> guids = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE); Assert.assertNotNull(guids); Assert.assertEquals(guids.size(), 3); @@ -125,7 +125,7 @@ public class TestEntitiesREST { newEntities.add(newTableEntity); EntityMutationResponse response2 = entitiesREST.createOrUpdate(newEntities); - List<AtlasEntityHeader> newGuids = response2.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE_OR_UPDATE); + List<AtlasEntityHeader> newGuids = response2.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE); Assert.assertNotNull(newGuids); Assert.assertEquals(newGuids.size(), 3); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntityREST.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntityREST.java b/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntityREST.java index 2a75773..ee36fdf 100644 --- a/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntityREST.java +++ b/webapp/src/test/java/org/apache/atlas/web/adapters/TestEntityREST.java @@ -79,7 +79,7 @@ public class TestEntityREST { final EntityMutationResponse response = entityREST.createOrUpdate(dbEntity); Assert.assertNotNull(response); - List<AtlasEntityHeader> entitiesMutated = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE_OR_UPDATE); + List<AtlasEntityHeader> entitiesMutated = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE); Assert.assertNotNull(entitiesMutated); Assert.assertEquals(entitiesMutated.size(), 1); @@ -160,7 +160,7 @@ public class TestEntityREST { dbEntity.setAttribute(TestUtilsV2.NAME, updatedDBName); final EntityMutationResponse response = entityREST.partialUpdateByUniqueAttribute(TestUtilsV2.DATABASE_TYPE, TestUtilsV2.NAME, prevDBName, dbEntity); - String dbGuid = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE_OR_UPDATE).get(0).getGuid(); + String dbGuid = response.getEntitiesByOperation(EntityMutations.EntityOperation.UPDATE).get(0).getGuid(); Assert.assertTrue(AtlasEntity.isAssigned(dbGuid)); //Get By unique attribute http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java index 44d8a11..51be64c 100755 --- a/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java +++ b/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java @@ -225,9 +225,9 @@ public abstract class BaseResourceIT { entity = entitiesClientV2.updateEntity(atlasEntity); } assertNotNull(entity); - assertNotNull(entity.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE_OR_UPDATE)); - assertTrue(entity.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE_OR_UPDATE).size() > 0); - return entity.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE_OR_UPDATE).get(0); + assertNotNull(entity.getEntitiesByOperation(EntityMutations.EntityOperation.UPDATE)); + assertTrue(entity.getEntitiesByOperation(EntityMutations.EntityOperation.UPDATE).size() > 0); + return entity.getEntitiesByOperation(EntityMutations.EntityOperation.UPDATE).get(0); } catch (AtlasServiceException e) { LOG.error("Entity {} failed", update ? "update" : "creation", entity); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/webapp/src/test/java/org/apache/atlas/web/resources/EntityDiscoveryJerseyResourceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/EntityDiscoveryJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/EntityDiscoveryJerseyResourceIT.java index 2ade5b0..2bbe10a 100755 --- a/webapp/src/test/java/org/apache/atlas/web/resources/EntityDiscoveryJerseyResourceIT.java +++ b/webapp/src/test/java/org/apache/atlas/web/resources/EntityDiscoveryJerseyResourceIT.java @@ -75,7 +75,7 @@ public class EntityDiscoveryJerseyResourceIT extends BaseResourceIT { AtlasEntityHeaderWithAssociations dbEntity = entities.get(0); assertEquals(dbEntity.getTypeName(), DATABASE_TYPE); assertEquals(dbEntity.getDisplayText(), dbName); - assertEquals(dbEntity.getStatus(), Status.STATUS_ACTIVE); + assertEquals(dbEntity.getStatus(), Status.ACTIVE); assertNotNull(dbEntity.getGuid()); assertNull(searchResult.getAttributes()); assertNull(searchResult.getFullTextResult()); @@ -137,7 +137,7 @@ public class EntityDiscoveryJerseyResourceIT extends BaseResourceIT { AtlasEntityHeaderWithAssociations dbEntity = entities.get(0); assertEquals(dbEntity.getTypeName(), DATABASE_TYPE); assertEquals(dbEntity.getDisplayText(), dbName); - assertEquals(dbEntity.getStatus(), Status.STATUS_ACTIVE); + assertEquals(dbEntity.getStatus(), Status.ACTIVE); assertNotNull(dbEntity.getGuid()); assertNull(searchResult.getAttributes()); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2f1cb57a/webapp/src/test/java/org/apache/atlas/web/resources/EntityV2JerseyResourceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/EntityV2JerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/EntityV2JerseyResourceIT.java index d7702e2..74338fd 100755 --- a/webapp/src/test/java/org/apache/atlas/web/resources/EntityV2JerseyResourceIT.java +++ b/webapp/src/test/java/org/apache/atlas/web/resources/EntityV2JerseyResourceIT.java @@ -138,7 +138,7 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT { EntityMutationResponse entity = entitiesClientV2.createEntity(hiveTableInstanceV2); assertNotNull(entity); - assertNotNull(entity.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE_OR_UPDATE)); + assertNotNull(entity.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE)); results = searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, DATABASE_NAME)); assertEquals(results.length(), 1); } @@ -175,9 +175,9 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT { instance.setAttribute("name", randomString()); EntityMutationResponse mutationResponse = entitiesClientV2.createEntity(instance); assertNotNull(mutationResponse); - assertNotNull(mutationResponse.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE_OR_UPDATE)); - assertEquals(mutationResponse.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE_OR_UPDATE).size(),1 ); - String guid = mutationResponse.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE_OR_UPDATE).get(0).getGuid(); + assertNotNull(mutationResponse.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE)); + assertEquals(mutationResponse.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE).size(),1 ); + String guid = mutationResponse.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE).get(0).getGuid(); //update type - add attribute entityDef = AtlasTypeUtil.createClassTypeDef(entityDef.getName(), ImmutableSet.<String>of(), @@ -352,7 +352,7 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT { entityByGuid.setAttribute(property, value); EntityMutationResponse response = entitiesClientV2.updateEntity(entityByGuid); assertNotNull(response); - assertNotNull(response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE_OR_UPDATE)); + assertNotNull(response.getEntitiesByOperation(EntityMutations.EntityOperation.UPDATE)); } private AtlasEntity createHiveDB() { @@ -575,8 +575,8 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT { LOG.debug("Updating entity= " + tableUpdated); EntityMutationResponse updateResult = entitiesClientV2.updateEntity(tableEntity.getGuid(), tableUpdated); assertNotNull(updateResult); - assertNotNull(updateResult.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE_OR_UPDATE)); - assertTrue(updateResult.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE_OR_UPDATE).size() > 0); + assertNotNull(updateResult.getEntitiesByOperation(EntityMutations.EntityOperation.UPDATE)); + assertTrue(updateResult.getEntitiesByOperation(EntityMutations.EntityOperation.UPDATE).size() > 0); AtlasEntity entityByGuid = entitiesClientV2.getEntityByGuid(tableEntity.getGuid()); assertNotNull(entityByGuid); @@ -593,8 +593,8 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT { EntityMutationResponse updateResponse = entitiesClientV2.updateEntityByAttribute(BaseResourceIT.HIVE_TABLE_TYPE, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, (String) tableEntity.getAttribute("name"), tableUpdated); assertNotNull(updateResponse); - assertNotNull(updateResponse.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE_OR_UPDATE)); - assertTrue(updateResponse.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE_OR_UPDATE).size() > 0); + assertNotNull(updateResponse.getEntitiesByOperation(EntityMutations.EntityOperation.UPDATE)); + assertTrue(updateResponse.getEntitiesByOperation(EntityMutations.EntityOperation.UPDATE).size() > 0); entityByGuid = entitiesClientV2.getEntityByGuid(tableEntity.getGuid()); assertNotNull(entityByGuid); @@ -623,8 +623,8 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT { tableEntity.setAttribute("columns", columns); EntityMutationResponse updateEntityResult = entitiesClientV2.updateEntity(tableEntity); assertNotNull(updateEntityResult); - assertNotNull(updateEntityResult.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE_OR_UPDATE)); - assertEquals(updateEntityResult.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE_OR_UPDATE).size(), 3); + assertNotNull(updateEntityResult.getEntitiesByOperation(EntityMutations.EntityOperation.UPDATE)); + assertEquals(updateEntityResult.getEntitiesByOperation(EntityMutations.EntityOperation.UPDATE).size(), 3); AtlasEntity entityByGuid = entitiesClientV2.getEntityByGuid(tableEntity.getGuid()); List<AtlasEntity> refs = (List<AtlasEntity>) entityByGuid.getAttribute("columns");
