http://git-wip-us.apache.org/repos/asf/atlas/blob/4823c8ed/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java deleted file mode 100644 index 67631bf..0000000 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java +++ /dev/null @@ -1,539 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.repository.store.graph.v1; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -import static org.apache.atlas.repository.Constants.TYPE_CATEGORY_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.VERTEX_TYPE_PROPERTY_KEY; -import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.VERTEX_TYPE; - -import java.util.Date; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.RequestContextV1; -import org.apache.atlas.annotation.GraphTransaction; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.listener.TypeDefChangeListener; -import org.apache.atlas.model.typedef.*; -import org.apache.atlas.repository.Constants; -import org.apache.atlas.repository.graphdb.*; -import org.apache.atlas.repository.store.graph.*; -import org.apache.atlas.type.AtlasType; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.typesystem.types.DataTypes.TypeCategory; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - -import javax.inject.Inject; -import javax.inject.Singleton; - - -/** - * Graph persistence store for TypeDef - v1 - */ -@Singleton -@Component -public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { - private static final Logger LOG = LoggerFactory.getLogger(AtlasTypeDefGraphStoreV1.class); - - protected final AtlasGraph atlasGraph; - - @Inject - public AtlasTypeDefGraphStoreV1(AtlasTypeRegistry typeRegistry, - Set<TypeDefChangeListener> typeDefChangeListeners, - AtlasGraph atlasGraph) { - super(typeRegistry, typeDefChangeListeners); - this.atlasGraph = atlasGraph; - - LOG.debug("<== AtlasTypeDefGraphStoreV1()"); - } - - @Override - protected AtlasDefStore<AtlasEnumDef> getEnumDefStore(AtlasTypeRegistry typeRegistry) { - return new AtlasEnumDefStoreV1(this, typeRegistry); - } - - @Override - protected AtlasDefStore<AtlasStructDef> getStructDefStore(AtlasTypeRegistry typeRegistry) { - return new AtlasStructDefStoreV1(this, typeRegistry); - } - - @Override - protected AtlasDefStore<AtlasClassificationDef> getClassificationDefStore(AtlasTypeRegistry typeRegistry) { - return new AtlasClassificationDefStoreV1(this, typeRegistry); - } - - @Override - protected AtlasDefStore<AtlasEntityDef> getEntityDefStore(AtlasTypeRegistry typeRegistry) { - return new AtlasEntityDefStoreV1(this, typeRegistry); - } - - @Override - protected AtlasDefStore<AtlasRelationshipDef> getRelationshipDefStore(AtlasTypeRegistry typeRegistry) { - return new AtlasRelationshipDefStoreV1(this, typeRegistry); - } - - - @Override - @GraphTransaction - public void init() throws AtlasBaseException { - LOG.info("==> AtlasTypeDefGraphStoreV1.init()"); - - super.init(); - - LOG.info("<== AtlasTypeDefGraphStoreV1.init()"); - } - - AtlasGraph getAtlasGraph() { return atlasGraph; } - - @VisibleForTesting - public AtlasVertex findTypeVertexByName(String typeName) { - Iterator results = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE) - .has(Constants.TYPENAME_PROPERTY_KEY, typeName) - .vertices().iterator(); - - return (results != null && results.hasNext()) ? (AtlasVertex) results.next() : null; - } - - AtlasVertex findTypeVertexByNameAndCategory(String typeName, TypeCategory category) { - Iterator results = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE) - .has(Constants.TYPENAME_PROPERTY_KEY, typeName) - .has(TYPE_CATEGORY_PROPERTY_KEY, category) - .vertices().iterator(); - - return (results != null && results.hasNext()) ? (AtlasVertex) results.next() : null; - } - - AtlasVertex findTypeVertexByGuid(String typeGuid) { - Iterator<AtlasVertex> vertices = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE) - .has(Constants.GUID_PROPERTY_KEY, typeGuid) - .vertices().iterator(); - - return (vertices != null && vertices.hasNext()) ? vertices.next() : null; - } - - AtlasVertex findTypeVertexByGuidAndCategory(String typeGuid, TypeCategory category) { - Iterator<AtlasVertex> vertices = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE) - .has(Constants.GUID_PROPERTY_KEY, typeGuid) - .has(TYPE_CATEGORY_PROPERTY_KEY, category) - .vertices().iterator(); - - return (vertices != null && vertices.hasNext()) ? vertices.next() : null; - } - - Iterator<AtlasVertex> findTypeVerticesByCategory(TypeCategory category) { - - return (Iterator<AtlasVertex>) atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE) - .has(TYPE_CATEGORY_PROPERTY_KEY, category) - .vertices().iterator(); - } - - AtlasVertex createTypeVertex(AtlasBaseTypeDef typeDef) { - // Validate all the required checks - Preconditions.checkArgument(StringUtils.isNotBlank(typeDef.getName()), "Type name can't be null/empty"); - - AtlasVertex ret = atlasGraph.addVertex(); - - if (StringUtils.isBlank(typeDef.getTypeVersion())) { - typeDef.setTypeVersion("1.0"); - } - - if (typeDef.getVersion() == null) { - typeDef.setVersion(1L); - } - - if (StringUtils.isBlank(typeDef.getGuid())) { - typeDef.setGuid(UUID.randomUUID().toString()); - } - - if (typeDef.getCreateTime() == null) { - typeDef.setCreateTime(new Date()); - } - - if (typeDef.getUpdateTime() == null) { - typeDef.setUpdateTime(new Date()); - } - - ret.setProperty(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE); // Mark as type vertex - ret.setProperty(TYPE_CATEGORY_PROPERTY_KEY, getTypeCategory(typeDef)); - - ret.setProperty(Constants.TYPENAME_PROPERTY_KEY, typeDef.getName()); - ret.setProperty(Constants.TYPEDESCRIPTION_PROPERTY_KEY, - StringUtils.isNotBlank(typeDef.getDescription()) ? typeDef.getDescription() : typeDef.getName()); - ret.setProperty(Constants.TYPEVERSION_PROPERTY_KEY, typeDef.getTypeVersion()); - ret.setProperty(Constants.GUID_PROPERTY_KEY, typeDef.getGuid()); - ret.setProperty(Constants.CREATED_BY_KEY, getCurrentUser()); - ret.setProperty(Constants.TIMESTAMP_PROPERTY_KEY, typeDef.getCreateTime().getTime()); - ret.setProperty(Constants.MODIFIED_BY_KEY, getCurrentUser()); - ret.setProperty(Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, typeDef.getUpdateTime().getTime()); - ret.setProperty(Constants.VERSION_PROPERTY_KEY, typeDef.getVersion()); - ret.setProperty(Constants.TYPEOPTIONS_PROPERTY_KEY, AtlasType.toJson(typeDef.getOptions())); - - return ret; - } - - void updateTypeVertex(AtlasBaseTypeDef typeDef, AtlasVertex vertex) { - if (!isTypeVertex(vertex)) { - LOG.warn("updateTypeVertex(): not a type-vertex - {}", vertex); - - return; - } - - updateVertexProperty(vertex, Constants.GUID_PROPERTY_KEY, typeDef.getGuid()); - /* - * rename of a type is supported yet - as the typename is used to in the name of the edges from this vertex - * To support rename of types, he edge names should be derived from an internal-name - not directly the typename - * - updateVertexProperty(vertex, Constants.TYPENAME_PROPERTY_KEY, typeDef.getName()); - */ - updateVertexProperty(vertex, Constants.TYPEDESCRIPTION_PROPERTY_KEY, typeDef.getDescription()); - updateVertexProperty(vertex, Constants.TYPEVERSION_PROPERTY_KEY, typeDef.getTypeVersion()); - updateVertexProperty(vertex, Constants.TYPEOPTIONS_PROPERTY_KEY, AtlasType.toJson(typeDef.getOptions())); - - markVertexUpdated(vertex); - } - - void deleteTypeVertexOutEdges(AtlasVertex vertex) throws AtlasBaseException { - Iterable<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.OUT); - - for (AtlasEdge edge : edges) { - atlasGraph.removeEdge(edge); - } - } - - /** - * Look to see if there are any IN edges with the supplied label - * @param vertex - * @param label - * @return - * @throws AtlasBaseException - */ - boolean hasIncomingEdgesWithLabel(AtlasVertex vertex, String label) throws AtlasBaseException { - boolean foundEdges = false; - Iterator<AtlasEdge> inEdges = vertex.getEdges(AtlasEdgeDirection.IN).iterator(); - - while (inEdges.hasNext()) { - AtlasEdge edge = inEdges.next(); - - if (label.equals(edge.getLabel())) { - foundEdges = true; - break; - } - } - return foundEdges; - } - - void deleteTypeVertex(AtlasVertex vertex) throws AtlasBaseException { - Iterator<AtlasEdge> inEdges = vertex.getEdges(AtlasEdgeDirection.IN).iterator(); - - if (inEdges.hasNext()) { - throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES); - } - - Iterable<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.OUT); - - for (AtlasEdge edge : edges) { - atlasGraph.removeEdge(edge); - } - atlasGraph.removeVertex(vertex); - } - - void vertexToTypeDef(AtlasVertex vertex, AtlasBaseTypeDef typeDef) { - String name = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY, String.class); - String description = vertex.getProperty(Constants.TYPEDESCRIPTION_PROPERTY_KEY, String.class); - String typeVersion = vertex.getProperty(Constants.TYPEVERSION_PROPERTY_KEY, String.class); - String guid = vertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class); - String createdBy = vertex.getProperty(Constants.CREATED_BY_KEY, String.class); - String updatedBy = vertex.getProperty(Constants.MODIFIED_BY_KEY, String.class); - Long createTime = vertex.getProperty(Constants.TIMESTAMP_PROPERTY_KEY, Long.class); - Long updateTime = vertex.getProperty(Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class); - Object versionObj = vertex.getProperty(Constants.VERSION_PROPERTY_KEY, Object.class); - String options = vertex.getProperty(Constants.TYPEOPTIONS_PROPERTY_KEY, String.class); - - Long version = null; - - if(versionObj instanceof Number) { - version = ((Number)versionObj).longValue(); - } else if (versionObj != null) { - version = Long.valueOf(versionObj.toString()); - } else { - version = Long.valueOf(0); - } - - - typeDef.setName(name); - typeDef.setDescription(description); - typeDef.setTypeVersion(typeVersion); - typeDef.setGuid(guid); - typeDef.setCreatedBy(createdBy); - typeDef.setUpdatedBy(updatedBy); - - if (createTime != null) { - typeDef.setCreateTime(new Date(createTime)); - } - - if (updateTime != null) { - typeDef.setUpdateTime(new Date(updateTime)); - } - - if (version != null) { - typeDef.setVersion(version); - } - - if (options != null) { - typeDef.setOptions(AtlasType.fromJson(options, Map.class)); - } - } - - boolean isTypeVertex(AtlasVertex vertex) { - String vertexType = vertex.getProperty(Constants.VERTEX_TYPE_PROPERTY_KEY, String.class); - - return VERTEX_TYPE.equals(vertexType); - } - - @VisibleForTesting - public boolean isTypeVertex(AtlasVertex vertex, TypeCategory category) { - boolean ret = false; - - if (isTypeVertex(vertex)) { - Object objTypeCategory = vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY, Object.class); - - TypeCategory vertexCategory = null; - - if(objTypeCategory instanceof TypeCategory) { - vertexCategory = (TypeCategory) objTypeCategory; - } else if (objTypeCategory != null) { - vertexCategory = TypeCategory.valueOf(objTypeCategory.toString()); - } - - ret = category.equals(vertexCategory); - } - - return ret; - } - - boolean isTypeVertex(AtlasVertex vertex, TypeCategory[] categories) { - boolean ret = false; - - if (isTypeVertex(vertex)) { - TypeCategory vertexCategory = vertex.getProperty(TYPE_CATEGORY_PROPERTY_KEY, TypeCategory.class); - - for (TypeCategory category : categories) { - if (category.equals(vertexCategory)) { - ret = true; - - break; - } - } - } - - return ret; - } - - AtlasEdge getOrCreateEdge(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) { - AtlasEdge ret = null; - Iterable<AtlasEdge> edges = outVertex.getEdges(AtlasEdgeDirection.OUT, edgeLabel); - - for (AtlasEdge edge : edges) { - if (edge.getInVertex().getId().equals(inVertex.getId())) { - ret = edge; - break; - } - } - - if (ret == null) { - ret = addEdge(outVertex, inVertex, edgeLabel); - } - - return ret; - } - - AtlasEdge addEdge(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) { - - return atlasGraph.addEdge(outVertex, inVertex, edgeLabel); - } - - void removeEdge(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) { - Iterable<AtlasEdge> edges = outVertex.getEdges(AtlasEdgeDirection.OUT, edgeLabel); - - for (AtlasEdge edge : edges) { - if (edge.getInVertex().getId().equals(inVertex.getId())) { - atlasGraph.removeEdge(edge); - } - } - } - - void createSuperTypeEdges(AtlasVertex vertex, Set<String> superTypes, TypeCategory typeCategory) - throws AtlasBaseException { - Set<String> currentSuperTypes = getSuperTypeNames(vertex); - - if (CollectionUtils.isNotEmpty(superTypes)) { - if (! superTypes.containsAll(currentSuperTypes)) { - throw new AtlasBaseException(AtlasErrorCode.SUPERTYPE_REMOVAL_NOT_SUPPORTED); - } - - for (String superType : superTypes) { - AtlasVertex superTypeVertex = findTypeVertexByNameAndCategory(superType, typeCategory); - - getOrCreateEdge(vertex, superTypeVertex, AtlasGraphUtilsV1.SUPERTYPE_EDGE_LABEL); - } - } else if (CollectionUtils.isNotEmpty(currentSuperTypes)) { - throw new AtlasBaseException(AtlasErrorCode.SUPERTYPE_REMOVAL_NOT_SUPPORTED); - } - } - - public void createEntityTypeEdges(AtlasVertex classificationVertex, Set<String> entityTypes) throws AtlasBaseException { - Set<String> currentEntityTypes = getEntityTypeNames(classificationVertex); - String classificationTypeName = classificationVertex.getProperty(Constants.TYPENAME_PROPERTY_KEY, String.class); - - if (CollectionUtils.isNotEmpty(entityTypes)) { - if (!entityTypes.containsAll(currentEntityTypes)) { - throw new AtlasBaseException(AtlasErrorCode.ENTITYTYPE_REMOVAL_NOT_SUPPORTED, classificationTypeName); - } - - for (String entityType : entityTypes) { - AtlasVertex entityTypeVertex = findTypeVertexByNameAndCategory(entityType, TypeCategory.CLASS); - if (entityTypeVertex == null) { - throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATIONDEF_INVALID_ENTITYTYPES, classificationTypeName,entityType); - - } - getOrCreateEdge(classificationVertex, entityTypeVertex, AtlasGraphUtilsV1.ENTITYTYPE_EDGE_LABEL); - } - } else if (CollectionUtils.isNotEmpty(currentEntityTypes)) { // remove the restrictions, if present - for (String entityType : currentEntityTypes) { - AtlasVertex entityTypeVertex = findTypeVertexByNameAndCategory(entityType, TypeCategory.CLASS); - - if (entityTypeVertex == null) { - throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATIONDEF_INVALID_ENTITYTYPES, classificationTypeName,entityType); - - } - - removeEdge(classificationVertex, entityTypeVertex, AtlasGraphUtilsV1.ENTITYTYPE_EDGE_LABEL); - } - - } - } - - Set<String> getSuperTypeNames(AtlasVertex vertex) { - return getTypeNamesFromEdges(vertex,AtlasGraphUtilsV1.SUPERTYPE_EDGE_LABEL); - } - - Set<String> getEntityTypeNames(AtlasVertex vertex) { - return getTypeNamesFromEdges(vertex,AtlasGraphUtilsV1.ENTITYTYPE_EDGE_LABEL); - } - - /** - * Get the typename properties from the edges, that are associated with the vertex and have the supplied edge label. - * @param vertex - * @param edgeLabel - * @return set of type names - */ - private Set<String> getTypeNamesFromEdges(AtlasVertex vertex,String edgeLabel) { - Set<String> ret = new HashSet<>(); - Iterable<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.OUT, edgeLabel); - - for (AtlasEdge edge : edges) { - ret.add(edge.getInVertex().getProperty(Constants.TYPENAME_PROPERTY_KEY, String.class)); - } - - return ret; - } - - TypeCategory getTypeCategory(AtlasBaseTypeDef typeDef) { - switch (typeDef.getCategory()) { - case ENTITY: - return TypeCategory.CLASS; - - case CLASSIFICATION: - return TypeCategory.TRAIT; - - case STRUCT: - return TypeCategory.STRUCT; - - case ENUM: - return TypeCategory.ENUM; - - case RELATIONSHIP: - return TypeCategory.RELATIONSHIP; - } - - return null; - } - - /* - * update the given vertex property, if the new value is not-blank - */ - private void updateVertexProperty(AtlasVertex vertex, String propertyName, String newValue) { - if (StringUtils.isNotBlank(newValue)) { - String currValue = vertex.getProperty(propertyName, String.class); - - if (!StringUtils.equals(currValue, newValue)) { - vertex.setProperty(propertyName, newValue); - } - } - } - - /* - * update the given vertex property, if the new value is not-null - */ - private void updateVertexProperty(AtlasVertex vertex, String propertyName, Date newValue) { - if (newValue != null) { - Number currValue = vertex.getProperty(propertyName, Number.class); - - if (currValue == null || !currValue.equals(newValue.getTime())) { - vertex.setProperty(propertyName, newValue.getTime()); - } - } - } - - /* - * increment the version value for this vertex - */ - private void markVertexUpdated(AtlasVertex vertex) { - Date now = new Date(); - Number currVersion = vertex.getProperty(Constants.VERSION_PROPERTY_KEY, Number.class); - long newVersion = currVersion == null ? 1 : (currVersion.longValue() + 1); - - vertex.setProperty(Constants.MODIFIED_BY_KEY, getCurrentUser()); - vertex.setProperty(Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, now.getTime()); - vertex.setProperty(Constants.VERSION_PROPERTY_KEY, newVersion); - } - - private String getCurrentUser() { - String ret = RequestContextV1.get().getUser(); - - if (StringUtils.isBlank(ret)) { - ret = System.getProperty("user.name"); - - if (StringUtils.isBlank(ret)) { - ret = "atlas"; - } - } - - return ret; - } -}
http://git-wip-us.apache.org/repos/asf/atlas/blob/4823c8ed/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AttributeMutationContext.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AttributeMutationContext.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AttributeMutationContext.java deleted file mode 100644 index b6d82dd..0000000 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AttributeMutationContext.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.repository.store.graph.v1; - - -import org.apache.atlas.model.instance.EntityMutations.EntityOperation; -import org.apache.atlas.model.typedef.AtlasStructDef; -import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; -import org.apache.atlas.repository.graphdb.AtlasEdge; -import org.apache.atlas.repository.graphdb.AtlasVertex; -import org.apache.atlas.type.AtlasStructType; -import org.apache.atlas.type.AtlasStructType.AtlasAttribute; -import org.apache.atlas.type.AtlasType; - - -import java.util.Objects; - -public class AttributeMutationContext { - private EntityOperation op; - /** - * Atlas Attribute - */ - - private AtlasAttribute attribute; - - /** - * Overriding type for which elements are being mapped - */ - private AtlasType currentElementType; - - /** - * Current attribute value/entity/Struct instance - */ - private Object value; - - private String vertexProperty; - - /** - * - * The vertex which corresponds to the entity/struct for which we are mapping a complex attributes like struct, traits - */ - AtlasVertex referringVertex; - - /** - * The current edge(in case of updates) from the parent entity/struct to the complex attribute like struct, trait - */ - AtlasEdge existingEdge; - - public AttributeMutationContext(EntityOperation op, AtlasVertex referringVertex, AtlasAttribute attribute, Object value) { - this(op, referringVertex, attribute, value, attribute.getVertexPropertyName(), null, null); - } - - public AttributeMutationContext(EntityOperation op, AtlasVertex referringVertex, AtlasAttribute attribute, Object value, - String vertexProperty, AtlasType currentElementType, AtlasEdge currentEdge) { - this.op = op; - this.referringVertex = referringVertex; - this.attribute = attribute; - this.value = value; - this.vertexProperty = vertexProperty; - this.currentElementType = currentElementType; - this.existingEdge = currentEdge; - } - - @Override - public int hashCode() { - return Objects.hash(op, referringVertex, attribute, value, vertexProperty, currentElementType, existingEdge); - } - - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } else if (obj == this) { - return true; - } else if (obj.getClass() != getClass()) { - return false; - } else { - AttributeMutationContext rhs = (AttributeMutationContext) obj; - return Objects.equals(op, rhs.op) - && Objects.equals(referringVertex, rhs.referringVertex) - && Objects.equals(attribute, rhs.attribute) - && Objects.equals(value, rhs.value) - && Objects.equals(vertexProperty, rhs.vertexProperty) - && Objects.equals(currentElementType, rhs.currentElementType) - && Objects.equals(existingEdge, rhs.existingEdge); - } - } - - - public AtlasStructType getParentType() { - return attribute.getDefinedInType(); - } - - public AtlasStructDef getStructDef() { - return attribute.getDefinedInDef(); - } - - public AtlasAttributeDef getAttributeDef() { - return attribute.getAttributeDef(); - } - - public AtlasType getAttrType() { - return currentElementType == null ? attribute.getAttributeType() : currentElementType; - } - - public AtlasType getCurrentElementType() { - return currentElementType; - } - - public Object getValue() { - return value; - } - - public String getVertexProperty() { return vertexProperty; } - - public AtlasVertex getReferringVertex() { return referringVertex; } - - public AtlasEdge getCurrentEdge() { - return existingEdge; - } - - public void setElementType(final AtlasType attrType) { - this.currentElementType = attrType; - } - - public AtlasAttribute getAttribute() { - return attribute; - } - - public EntityOperation getOp() { - return op; - } - - public void setExistingEdge(AtlasEdge existingEdge) { this.existingEdge = existingEdge; } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/4823c8ed/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java deleted file mode 100644 index e17daf6..0000000 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.repository.store.graph.v1; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.RequestContextV1; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.impexp.AtlasImportResult; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; -import org.apache.atlas.model.instance.AtlasEntityHeader; -import org.apache.atlas.model.instance.EntityMutationResponse; -import org.apache.atlas.repository.store.graph.AtlasEntityStore; -import org.apache.atlas.repository.store.graph.BulkImporter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - -import javax.inject.Inject; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -@Component -public class BulkImporterImpl implements BulkImporter { - private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class); - - private final AtlasEntityStore entityStore; - - @Inject - public BulkImporterImpl(AtlasEntityStore entityStore) { - this.entityStore = entityStore; - } - - @Override - public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> bulkImport()"); - } - - if (entityStream == null || !entityStream.hasNext()) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update."); - } - - EntityMutationResponse ret = new EntityMutationResponse(); - ret.setGuidAssignments(new HashMap<String, String>()); - - Set<String> processedGuids = new HashSet<>(); - float currentPercent = 0f; - List<String> residualList = new ArrayList<>(); - - EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList); - - while (entityImportStreamWithResidualList.hasNext()) { - AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo(); - AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; - - if (entity == null) { - continue; - } - - AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, entityStream); - - try { - EntityMutationResponse resp = entityStore.createOrUpdateForImport(oneEntityStream); - - if (resp.getGuidAssignments() != null) { - ret.getGuidAssignments().putAll(resp.getGuidAssignments()); - } - - currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids, entityStream.getPosition(), entityImportStreamWithResidualList.getStreamSize(), currentPercent); - - entityStream.onImportComplete(entity.getGuid()); - } catch (AtlasBaseException e) { - if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) { - throw e; - } - } catch (Throwable e) { - AtlasBaseException abe = new AtlasBaseException(e); - - if (!updateResidualList(abe, residualList, entityWithExtInfo.getEntity().getGuid())) { - throw abe; - } - } finally { - RequestContextV1.clear(); - } - } - - importResult.getProcessedEntities().addAll(processedGuids); - LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size()); - - return ret; - } - - - private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) { - if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) { - return false; - } - - lineageList.add(guid); - - return true; - } - - private float updateImportMetrics(AtlasEntity.AtlasEntityWithExtInfo currentEntity, - EntityMutationResponse resp, - AtlasImportResult importResult, - Set<String> processedGuids, - int currentIndex, int streamSize, float currentPercent) { - updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); - updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); - updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); - - String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid()); - - return updateImportProgress(LOG, currentIndex + 1, streamSize, currentPercent, lastEntityImported); - } - - @VisibleForTesting - static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) { - final double tolerance = 0.000001; - final int MAX_PERCENT = 100; - - float percent = (float) ((currentIndex * MAX_PERCENT) / streamSize); - boolean updateLog = Double.compare(percent, currentPercent) > tolerance; - float updatedPercent = (MAX_PERCENT < streamSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent); - - if (updateLog) { - log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), streamSize, additionalInfo); - } - - return updatedPercent; - } - - private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) { - if (list == null) { - return; - } - - for (AtlasEntityHeader h : list) { - if (processedGuids.contains(h.getGuid())) { - continue; - } - - processedGuids.add(h.getGuid()); - importResult.incrementMeticsCounter(String.format(prefix, h.getTypeName())); - } - } - - private static class EntityImportStreamWithResidualList { - private final EntityImportStream stream; - private final List<String> residualList; - private boolean navigateResidualList; - private int currentResidualListIndex; - - - public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) { - this.stream = stream; - this.residualList = residualList; - this.navigateResidualList = false; - this.currentResidualListIndex = 0; - } - - public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() { - if (navigateResidualList == false) { - return stream.getNextEntityWithExtInfo(); - } else { - stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++)); - return stream.getNextEntityWithExtInfo(); - } - } - - public boolean hasNext() { - if (!navigateResidualList) { - boolean streamHasNext = stream.hasNext(); - navigateResidualList = (streamHasNext == false); - return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size()); - } else { - return (currentResidualListIndex < residualList.size()); - } - } - - public int getStreamSize() { - return stream.size() + residualList.size(); - } - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/4823c8ed/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java index 69a4758..4a0924bb 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java @@ -20,7 +20,7 @@ package org.apache.atlas.repository.store.graph.v1; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; -import org.apache.atlas.RequestContextV1; +import org.apache.atlas.RequestContext; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasClassification; @@ -35,6 +35,8 @@ import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; import org.apache.atlas.type.AtlasArrayType; import org.apache.atlas.type.AtlasClassificationType; import org.apache.atlas.type.AtlasEntityType; @@ -61,10 +63,10 @@ import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL; import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY; import static org.apache.atlas.repository.graph.GraphHelper.*; -import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getIdFromEdge; -import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getQualifiedAttributePropertyKey; -import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getState; -import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.isReference; +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromEdge; +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getQualifiedAttributePropertyKey; +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getState; +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isReference; import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT; public abstract class DeleteHandlerV1 { @@ -91,11 +93,11 @@ public abstract class DeleteHandlerV1 { * @throws AtlasException */ public void deleteEntities(Collection<AtlasVertex> instanceVertices) throws AtlasBaseException { - RequestContextV1 requestContext = RequestContextV1.get(); + RequestContext requestContext = RequestContext.get(); Set<AtlasVertex> deletionCandidateVertices = new HashSet<>(); for (AtlasVertex instanceVertex : instanceVertices) { - String guid = AtlasGraphUtilsV1.getIdFromVertex(instanceVertex); + String guid = AtlasGraphUtilsV2.getIdFromVertex(instanceVertex); AtlasEntity.Status state = getState(instanceVertex); if (state == DELETED || requestContext.isDeletedEntity(guid)) { @@ -295,7 +297,7 @@ public abstract class DeleteHandlerV1 { AtlasVertex referencedVertex = entityRetriever.getReferencedEntityVertex(edge, relationshipDirection, entityVertex); if (referencedVertex != null) { - RequestContextV1 requestContext = RequestContextV1.get(); + RequestContext requestContext = RequestContext.get(); if (!requestContext.isUpdatedEntity(GraphHelper.getGuid(referencedVertex))) { GraphHelper.setProperty(referencedVertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, requestContext.getRequestTime()); @@ -407,7 +409,7 @@ public abstract class DeleteHandlerV1 { addToPropagatedTraitNames(propagatedEntityVertex, classificationName); // record add propagation details to send notifications at the end - RequestContextV1 context = RequestContextV1.get(); + RequestContext context = RequestContext.get(); AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex); context.recordAddedPropagation(entityGuid, classification); @@ -475,7 +477,7 @@ public abstract class DeleteHandlerV1 { ret.add(entityVertex); // record remove propagation details to send notifications at the end - RequestContextV1.get().recordRemovedPropagation(getGuid(entityVertex), classification); + RequestContext.get().recordRemovedPropagation(getGuid(entityVertex), classification); deletePropagatedEdge(propagatedEdge); } @@ -490,7 +492,7 @@ public abstract class DeleteHandlerV1 { String classificationName = getClassificationName(classificationVertex); AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex); String entityGuid = getClassificationEntityGuid(classificationVertex); - RequestContextV1 context = RequestContextV1.get(); + RequestContext context = RequestContext.get(); for (AtlasVertex entityVertex : entityVertices) { AtlasEdge propagatedEdge = getPropagatedClassificationEdge(entityVertex, classificationName, entityGuid); @@ -575,7 +577,7 @@ public abstract class DeleteHandlerV1 { } public void deletePropagatedEdge(AtlasEdge edge) throws AtlasBaseException { - String classificationName = AtlasGraphUtilsV1.getProperty(edge, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, String.class); + String classificationName = AtlasGraphUtilsV2.getProperty(edge, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, String.class); AtlasVertex entityVertex = edge.getOutVertex(); if (LOG.isDebugEnabled()) { @@ -738,7 +740,7 @@ public abstract class DeleteHandlerV1 { final String outId = GraphHelper.getGuid(outVertex); final Status state = getState(outVertex); - if (state == DELETED || (outId != null && RequestContextV1.get().isDeletedEntity(outId))) { + if (state == DELETED || (outId != null && RequestContext.get().isDeletedEntity(outId))) { //If the reference vertex is marked for deletion, skip updating the reference return; } @@ -832,7 +834,7 @@ public abstract class DeleteHandlerV1 { if (edge != null) { deleteEdge(edge, isInternalType(inVertex) && isInternalType(outVertex)); - RequestContextV1 requestContext = RequestContextV1.get(); + RequestContext requestContext = RequestContext.get(); if (! requestContext.isUpdatedEntity(outId)) { GraphHelper.setProperty(outVertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, requestContext.getRequestTime());