http://git-wip-us.apache.org/repos/asf/atlas/blob/4823c8ed/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java deleted file mode 100644 index 0e90336..0000000 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java +++ /dev/null @@ -1,478 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.repository.store.graph.v1; - - -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.AtlasException; -import org.apache.atlas.RequestContextV1; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.listener.EntityChangeListener; -import org.apache.atlas.listener.EntityChangeListenerV2; -import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2; -import org.apache.atlas.model.glossary.AtlasGlossaryTerm; -import org.apache.atlas.model.instance.AtlasClassification; -import org.apache.atlas.model.instance.AtlasEntity; - -import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; -import org.apache.atlas.model.instance.AtlasEntityHeader; -import org.apache.atlas.model.instance.AtlasRelatedObjectId; -import org.apache.atlas.model.instance.EntityMutationResponse; -import org.apache.atlas.model.instance.EntityMutations.EntityOperation; -import org.apache.atlas.type.AtlasEntityType; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.v1.model.instance.Referenceable; -import org.apache.atlas.v1.model.instance.Struct; -import org.apache.atlas.repository.Constants; -import org.apache.atlas.repository.converters.AtlasInstanceConverter; -import org.apache.atlas.repository.graph.FullTextMapperV2; -import org.apache.atlas.repository.graph.GraphHelper; -import org.apache.atlas.repository.graphdb.AtlasVertex; -import org.apache.atlas.util.AtlasRepositoryConfiguration; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - -import javax.inject.Inject; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; - -import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD; -import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE; -import static org.apache.atlas.util.AtlasRepositoryConfiguration.isV2EntityNotificationEnabled; - - -@Component -public class AtlasEntityChangeNotifier { - private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityChangeNotifier.class); - - private final Set<EntityChangeListener> entityChangeListeners; - private final Set<EntityChangeListenerV2> entityChangeListenersV2; - private final AtlasInstanceConverter instanceConverter; - private final FullTextMapperV2 fullTextMapperV2; - private final AtlasTypeRegistry atlasTypeRegistry; - - - @Inject - public AtlasEntityChangeNotifier(Set<EntityChangeListener> entityChangeListeners, - Set<EntityChangeListenerV2> entityChangeListenersV2, - AtlasInstanceConverter instanceConverter, - FullTextMapperV2 fullTextMapperV2, - AtlasTypeRegistry atlasTypeRegistry) { - this.entityChangeListeners = entityChangeListeners; - this.entityChangeListenersV2 = entityChangeListenersV2; - this.instanceConverter = instanceConverter; - this.fullTextMapperV2 = fullTextMapperV2; - this.atlasTypeRegistry = atlasTypeRegistry; - } - - public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException { - if (CollectionUtils.isEmpty(entityChangeListeners) || instanceConverter == null) { - return; - } - - List<AtlasEntityHeader> createdEntities = entityMutationResponse.getCreatedEntities(); - List<AtlasEntityHeader> updatedEntities = entityMutationResponse.getUpdatedEntities(); - List<AtlasEntityHeader> partiallyUpdatedEntities = entityMutationResponse.getPartialUpdatedEntities(); - List<AtlasEntityHeader> deletedEntities = entityMutationResponse.getDeletedEntities(); - - // complete full text mapping before calling toReferenceables(), from notifyListners(), to - // include all vertex updates in the current graph-transaction - doFullTextMapping(createdEntities); - doFullTextMapping(updatedEntities); - doFullTextMapping(partiallyUpdatedEntities); - - notifyListeners(createdEntities, EntityOperation.CREATE, isImport); - notifyListeners(updatedEntities, EntityOperation.UPDATE, isImport); - notifyListeners(partiallyUpdatedEntities, EntityOperation.PARTIAL_UPDATE, isImport); - notifyListeners(deletedEntities, EntityOperation.DELETE, isImport); - - notifyPropagatedEntities(); - } - - public void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException { - if (isV2EntityNotificationEnabled()) { - doFullTextMapping(entity.getGuid()); - - for (EntityChangeListenerV2 listener : entityChangeListenersV2) { - listener.onClassificationsAdded(entity, addedClassifications); - } - } else { - updateFullTextMapping(entity.getGuid(), addedClassifications); - - Referenceable entityRef = toReferenceable(entity.getGuid()); - List<Struct> traits = toStruct(addedClassifications); - - if (entity == null || CollectionUtils.isEmpty(traits)) { - return; - } - - for (EntityChangeListener listener : entityChangeListeners) { - try { - listener.onTraitsAdded(entityRef, traits); - } catch (AtlasException e) { - throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitAdd"); - } - } - } - } - - public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException { - if (isV2EntityNotificationEnabled()) { - doFullTextMapping(entity.getGuid()); - - for (EntityChangeListenerV2 listener : entityChangeListenersV2) { - listener.onClassificationsUpdated(entity, updatedClassifications); - } - } else { - doFullTextMapping(entity.getGuid()); - - Referenceable entityRef = toReferenceable(entity.getGuid()); - List<Struct> traits = toStruct(updatedClassifications); - - if (entityRef == null || CollectionUtils.isEmpty(traits)) { - return; - } - - for (EntityChangeListener listener : entityChangeListeners) { - try { - listener.onTraitsUpdated(entityRef, traits); - } catch (AtlasException e) { - throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitUpdate"); - } - } - } - } - - public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException { - if (isV2EntityNotificationEnabled()) { - doFullTextMapping(entity.getGuid()); - - for (EntityChangeListenerV2 listener : entityChangeListenersV2) { - listener.onClassificationsDeleted(entity, deletedClassifications); - } - } else { - doFullTextMapping(entity.getGuid()); - - Referenceable entityRef = toReferenceable(entity.getGuid()); - List<Struct> traits = toStruct(deletedClassifications); - - if (entityRef == null || CollectionUtils.isEmpty(deletedClassifications)) { - return; - } - - for (EntityChangeListener listener : entityChangeListeners) { - try { - listener.onTraitsDeleted(entityRef, traits); - } catch (AtlasException e) { - throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitDelete"); - } - } - - } - } - - public void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException { - // listeners notified on term-entity association only if v2 notifications are enabled - if (isV2EntityNotificationEnabled()) { - for (EntityChangeListenerV2 listener : entityChangeListenersV2) { - listener.onTermAdded(term, entityIds); - } - } else { - List<Referenceable> entityRefs = toReferenceables(entityIds); - - for (EntityChangeListener listener : entityChangeListeners) { - try { - listener.onTermAdded(entityRefs, term); - } catch (AtlasException e) { - throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TermAdd"); - } - } - } - } - - public void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException { - // listeners notified on term-entity disassociation only if v2 notifications are enabled - if (isV2EntityNotificationEnabled()) { - for (EntityChangeListenerV2 listener : entityChangeListenersV2) { - listener.onTermDeleted(term, entityIds); - } - } else { - List<Referenceable> entityRefs = toReferenceables(entityIds); - - for (EntityChangeListener listener : entityChangeListeners) { - try { - listener.onTermDeleted(entityRefs, term); - } catch (AtlasException e) { - throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TermDelete"); - } - } - } - } - - public void notifyPropagatedEntities() throws AtlasBaseException { - RequestContextV1 context = RequestContextV1.get(); - Map<String, List<AtlasClassification>> addedPropagations = context.getAddedPropagations(); - Map<String, List<AtlasClassification>> removedPropagations = context.getRemovedPropagations(); - - notifyPropagatedEntities(addedPropagations, PROPAGATED_CLASSIFICATION_ADD); - notifyPropagatedEntities(removedPropagations, PROPAGATED_CLASSIFICATION_DELETE); - } - - private void notifyPropagatedEntities(Map<String, List<AtlasClassification>> entityPropagationMap, EntityAuditActionV2 action) throws AtlasBaseException { - if (MapUtils.isEmpty(entityPropagationMap) || action == null) { - return; - } - - for (String guid : entityPropagationMap.keySet()) { - AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(guid); - AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; - - if (entity == null) { - continue; - } - - if (action == PROPAGATED_CLASSIFICATION_ADD) { - onClassificationAddedToEntity(entity, entityPropagationMap.get(guid)); - } else if (action == PROPAGATED_CLASSIFICATION_DELETE) { - onClassificationDeletedFromEntity(entity, entityPropagationMap.get(guid)); - } - } - } - - private String getListenerName(EntityChangeListener listener) { - return listener.getClass().getSimpleName(); - } - - private void notifyListeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException { - if (CollectionUtils.isEmpty(entityHeaders)) { - return; - } - - if (isV2EntityNotificationEnabled()) { - notifyV2Listeners(entityHeaders, operation, isImport); - } else { - notifyV1Listeners(entityHeaders, operation, isImport); - } - } - - private void notifyV1Listeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException { - List<Referenceable> typedRefInsts = toReferenceables(entityHeaders, operation); - - for (EntityChangeListener listener : entityChangeListeners) { - try { - switch (operation) { - case CREATE: - listener.onEntitiesAdded(typedRefInsts, isImport); - break; - case UPDATE: - case PARTIAL_UPDATE: - listener.onEntitiesUpdated(typedRefInsts, isImport); - break; - case DELETE: - listener.onEntitiesDeleted(typedRefInsts, isImport); - break; - } - } catch (AtlasException e) { - throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), operation.toString()); - } - } - } - - private void notifyV2Listeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException { - List<AtlasEntity> entities = toAtlasEntities(entityHeaders); - - for (EntityChangeListenerV2 listener : entityChangeListenersV2) { - switch (operation) { - case CREATE: - listener.onEntitiesAdded(entities, isImport); - break; - case UPDATE: - case PARTIAL_UPDATE: - listener.onEntitiesUpdated(entities, isImport); - break; - case DELETE: - listener.onEntitiesDeleted(entities, isImport); - break; - } - } - } - - private List<Referenceable> toReferenceables(List<AtlasEntityHeader> entityHeaders, EntityOperation operation) throws AtlasBaseException { - List<Referenceable> ret = new ArrayList<>(entityHeaders.size()); - - // delete notifications don't need all attributes. Hence the special handling for delete operation - if (operation == EntityOperation.DELETE) { - for (AtlasEntityHeader entityHeader : entityHeaders) { - ret.add(new Referenceable(entityHeader.getGuid(), entityHeader.getTypeName(), entityHeader.getAttributes())); - } - } else { - for (AtlasEntityHeader entityHeader : entityHeaders) { - ret.add(toReferenceable(entityHeader.getGuid())); - } - } - - return ret; - } - - private List<Referenceable> toReferenceables(List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException { - List<Referenceable> ret = new ArrayList<>(); - - if (CollectionUtils.isNotEmpty(entityIds)) { - for (AtlasRelatedObjectId relatedObjectId : entityIds) { - String entityGuid = relatedObjectId.getGuid(); - - ret.add(toReferenceable(entityGuid)); - } - } - - return ret; - } - - private Referenceable toReferenceable(String entityId) throws AtlasBaseException { - Referenceable ret = null; - - if (StringUtils.isNotEmpty(entityId)) { - ret = instanceConverter.getReferenceable(entityId); - } - - return ret; - } - - private List<Struct> toStruct(List<AtlasClassification> classifications) throws AtlasBaseException { - List<Struct> ret = null; - - if (classifications != null) { - ret = new ArrayList<>(classifications.size()); - - for (AtlasClassification classification : classifications) { - if (classification != null) { - ret.add(instanceConverter.getTrait(classification)); - } - } - } - - return ret; - } - - private List<AtlasEntity> toAtlasEntities(List<AtlasEntityHeader> entityHeaders) throws AtlasBaseException { - List<AtlasEntity> ret = new ArrayList<>(); - - if (CollectionUtils.isNotEmpty(entityHeaders)) { - for (AtlasEntityHeader entityHeader : entityHeaders) { - String entityGuid = entityHeader.getGuid(); - String typeName = entityHeader.getTypeName(); - - // Skip all internal types as the HARD DELETE will cause lookup errors - AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName(typeName); - if (Objects.nonNull(entityType) && entityType.isInternalType()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping internal type = {}", typeName); - } - continue; - } - - AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid); - - if (entityWithExtInfo != null) { - ret.add(entityWithExtInfo.getEntity()); - } - } - } - - return ret; - } - - private void doFullTextMapping(List<AtlasEntityHeader> entityHeaders) { - if (CollectionUtils.isEmpty(entityHeaders)) { - return; - } - - try { - if(!AtlasRepositoryConfiguration.isFullTextSearchEnabled()) { - return; - } - } catch (AtlasException e) { - LOG.warn("Unable to determine if FullText is disabled. Proceeding with FullText mapping"); - } - - for (AtlasEntityHeader entityHeader : entityHeaders) { - if(GraphHelper.isInternalType(entityHeader.getTypeName())) { - continue; - } - - String guid = entityHeader.getGuid(); - AtlasVertex vertex = AtlasGraphUtilsV1.findByGuid(guid); - - if(vertex == null) { - continue; - } - - try { - String fullText = fullTextMapperV2.getIndexTextForEntity(guid); - - GraphHelper.setProperty(vertex, Constants.ENTITY_TEXT_PROPERTY_KEY, fullText); - } catch (AtlasBaseException e) { - LOG.error("FullText mapping failed for Vertex[ guid = {} ]", guid, e); - } - } - } - - private void updateFullTextMapping(String entityId, List<AtlasClassification> classifications) { - try { - if(!AtlasRepositoryConfiguration.isFullTextSearchEnabled()) { - return; - } - } catch (AtlasException e) { - LOG.warn("Unable to determine if FullText is disabled. Proceeding with FullText mapping"); - } - - if (StringUtils.isEmpty(entityId) || CollectionUtils.isEmpty(classifications)) { - return; - } - - AtlasVertex atlasVertex = AtlasGraphUtilsV1.findByGuid(entityId); - if(atlasVertex == null || GraphHelper.isInternalType(atlasVertex)) { - return; - } - - try { - String classificationFullText = fullTextMapperV2.getIndexTextForClassifications(entityId, classifications); - String existingFullText = (String) GraphHelper.getProperty(atlasVertex, Constants.ENTITY_TEXT_PROPERTY_KEY); - - String newFullText = existingFullText + " " + classificationFullText; - GraphHelper.setProperty(atlasVertex, Constants.ENTITY_TEXT_PROPERTY_KEY, newFullText); - } catch (AtlasBaseException e) { - LOG.error("FullText mapping failed for Vertex[ guid = {} ]", entityId, e); - } - } - - private void doFullTextMapping(String guid) { - AtlasEntityHeader entityHeader = new AtlasEntityHeader(); - entityHeader.setGuid(guid); - - doFullTextMapping(Collections.singletonList(entityHeader)); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/atlas/blob/4823c8ed/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityDefStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityDefStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityDefStoreV1.java deleted file mode 100644 index ebdba3b..0000000 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityDefStoreV1.java +++ /dev/null @@ -1,358 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.repository.store.graph.v1; - -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.authorize.AtlasPrivilege; -import org.apache.atlas.authorize.AtlasTypeAccessRequest; -import org.apache.atlas.authorize.AtlasAuthorizationUtils; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.typedef.AtlasEntityDef; -import org.apache.atlas.repository.Constants; -import org.apache.atlas.repository.graphdb.AtlasVertex; -import org.apache.atlas.type.AtlasEntityType; -import org.apache.atlas.type.AtlasType; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.typesystem.types.DataTypes.TypeCategory; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.inject.Inject; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -/** - * EntityDef store in v1 format. - */ -public class AtlasEntityDefStoreV1 extends AtlasAbstractDefStoreV1<AtlasEntityDef> { - private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityDefStoreV1.class); - - @Inject - public AtlasEntityDefStoreV1(AtlasTypeDefGraphStoreV1 typeDefStore, AtlasTypeRegistry typeRegistry) { - super(typeDefStore, typeRegistry); - } - - @Override - public AtlasVertex preCreate(AtlasEntityDef entityDef) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasEntityDefStoreV1.preCreate({})", entityDef); - } - - validateType(entityDef); - - AtlasType type = typeRegistry.getType(entityDef.getName()); - - if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.ENTITY) { - throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, entityDef.getName(), TypeCategory.CLASS.name()); - } - - - - AtlasVertex ret = typeDefStore.findTypeVertexByName(entityDef.getName()); - - if (ret != null) { - throw new AtlasBaseException(AtlasErrorCode.TYPE_ALREADY_EXISTS, entityDef.getName()); - } - - ret = typeDefStore.createTypeVertex(entityDef); - - updateVertexPreCreate(entityDef, (AtlasEntityType)type, ret); - - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasEntityDefStoreV1.preCreate({}): {}", entityDef, ret); - } - - return ret; - } - - @Override - public AtlasEntityDef create(AtlasEntityDef entityDef, AtlasVertex preCreateResult) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasEntityDefStoreV1.create({}, {})", entityDef, preCreateResult); - } - - AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_CREATE, entityDef), "create entity-def ", entityDef.getName()); - - AtlasVertex vertex = (preCreateResult == null) ? preCreate(entityDef) : preCreateResult; - - updateVertexAddReferences(entityDef, vertex); - - AtlasEntityDef ret = toEntityDef(vertex); - - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasEntityDefStoreV1.create({}, {}): {}", entityDef, preCreateResult, ret); - } - - return ret; - } - - @Override - public List<AtlasEntityDef> getAll() throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasEntityDefStoreV1.getAll()"); - } - - List<AtlasEntityDef> ret = new ArrayList<>(); - Iterator<AtlasVertex> vertices = typeDefStore.findTypeVerticesByCategory(TypeCategory.CLASS); - - while (vertices.hasNext()) { - ret.add(toEntityDef(vertices.next())); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasEntityDefStoreV1.getAll(): count={}", ret.size()); - } - - return ret; - } - - @Override - public AtlasEntityDef getByName(String name) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasEntityDefStoreV1.getByName({})", name); - } - - AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.CLASS); - - if (vertex == null) { - throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name); - } - - vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY, TypeCategory.class); - - AtlasEntityDef ret = toEntityDef(vertex); - - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasEntityDefStoreV1.getByName({}): {}", name, ret); - } - - return ret; - } - - @Override - public AtlasEntityDef getByGuid(String guid) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasEntityDefStoreV1.getByGuid({})", guid); - } - - AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.CLASS); - - if (vertex == null) { - throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid); - } - - AtlasEntityDef ret = toEntityDef(vertex); - - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasEntityDefStoreV1.getByGuid({}): {}", guid, ret); - } - - return ret; - } - - @Override - public AtlasEntityDef update(AtlasEntityDef entityDef) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasEntityDefStoreV1.update({})", entityDef); - } - - validateType(entityDef); - - AtlasEntityDef ret = StringUtils.isNotBlank(entityDef.getGuid()) ? updateByGuid(entityDef.getGuid(), entityDef) - : updateByName(entityDef.getName(), entityDef); - - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasEntityDefStoreV1.update({}): {}", entityDef, ret); - } - - return ret; - } - - @Override - public AtlasEntityDef updateByName(String name, AtlasEntityDef entityDef) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasEntityDefStoreV1.updateByName({}, {})", name, entityDef); - } - - AtlasEntityDef existingDef = typeRegistry.getEntityDefByName(name); - - AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update entity-def ", name); - - validateType(entityDef); - - AtlasType type = typeRegistry.getType(entityDef.getName()); - - if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.ENTITY) { - throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, entityDef.getName(), TypeCategory.CLASS.name()); - } - - AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.CLASS); - - if (vertex == null) { - throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name); - } - - updateVertexPreUpdate(entityDef, (AtlasEntityType)type, vertex); - updateVertexAddReferences(entityDef, vertex); - - AtlasEntityDef ret = toEntityDef(vertex); - - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasEntityDefStoreV1.updateByName({}, {}): {}", name, entityDef, ret); - } - - return ret; - } - - @Override - public AtlasEntityDef updateByGuid(String guid, AtlasEntityDef entityDef) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasEntityDefStoreV1.updateByGuid({})", guid); - } - - AtlasEntityDef existingDef = typeRegistry.getEntityDefByGuid(guid); - - AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update entity-def ", (existingDef != null ? existingDef.getName() : guid)); - - validateType(entityDef); - - AtlasType type = typeRegistry.getTypeByGuid(guid); - - if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.ENTITY) { - throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, entityDef.getName(), TypeCategory.CLASS.name()); - } - - AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.CLASS); - - if (vertex == null) { - throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid); - } - - updateVertexPreUpdate(entityDef, (AtlasEntityType)type, vertex); - updateVertexAddReferences(entityDef, vertex); - - AtlasEntityDef ret = toEntityDef(vertex); - - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasEntityDefStoreV1.updateByGuid({}): {}", guid, ret); - } - - return ret; - } - - @Override - public AtlasVertex preDeleteByName(String name) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasEntityDefStoreV1.preDeleteByName({})", name); - } - - AtlasEntityDef existingDef = typeRegistry.getEntityDefByName(name); - - AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete entity-def ", name); - - AtlasVertex ret = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.CLASS); - - if (AtlasGraphUtilsV1.typeHasInstanceVertex(name)) { - throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, name); - } - - if (ret == null) { - throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name); - } - - // error if we are trying to delete an entityDef that has a relationshipDef - if (typeDefStore.hasIncomingEdgesWithLabel(ret, AtlasGraphUtilsV1.RELATIONSHIPTYPE_EDGE_LABEL)){ - throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_RELATIONSHIPS, name); - } - - typeDefStore.deleteTypeVertexOutEdges(ret); - - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasEntityDefStoreV1.preDeleteByName({}): {}", name, ret); - } - - return ret; - } - - @Override - public AtlasVertex preDeleteByGuid(String guid) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasEntityDefStoreV1.preDeleteByGuid({})", guid); - } - - AtlasEntityDef existingDef = typeRegistry.getEntityDefByGuid(guid); - - AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete entity-def ", (existingDef != null ? existingDef.getName() : guid)); - - AtlasVertex ret = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.CLASS); - - String typeName = AtlasGraphUtilsV1.getProperty(ret, Constants.TYPENAME_PROPERTY_KEY, String.class); - - if (AtlasGraphUtilsV1.typeHasInstanceVertex(typeName)) { - throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, typeName); - } - - if (ret == null) { - throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid); - } - - // error if we are trying to delete an entityDef that has a relationshipDef - if (typeDefStore.hasIncomingEdgesWithLabel(ret, AtlasGraphUtilsV1.RELATIONSHIPTYPE_EDGE_LABEL)){ - throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_RELATIONSHIPS, typeName); - } - - typeDefStore.deleteTypeVertexOutEdges(ret); - - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasEntityDefStoreV1.preDeleteByGuid({}): {}", guid, ret); - } - - return ret; - } - - private void updateVertexPreCreate(AtlasEntityDef entityDef, AtlasEntityType entityType, AtlasVertex vertex) throws AtlasBaseException { - AtlasStructDefStoreV1.updateVertexPreCreate(entityDef, entityType, vertex, typeDefStore); - } - - private void updateVertexPreUpdate(AtlasEntityDef entityDef, AtlasEntityType entityType, AtlasVertex vertex) - throws AtlasBaseException { - AtlasStructDefStoreV1.updateVertexPreUpdate(entityDef, entityType, vertex, typeDefStore); - } - - private void updateVertexAddReferences(AtlasEntityDef entityDef, AtlasVertex vertex) throws AtlasBaseException { - AtlasStructDefStoreV1.updateVertexAddReferences(entityDef, vertex, typeDefStore); - - typeDefStore.createSuperTypeEdges(vertex, entityDef.getSuperTypes(), TypeCategory.CLASS); - } - - private AtlasEntityDef toEntityDef(AtlasVertex vertex) throws AtlasBaseException { - AtlasEntityDef ret = null; - - if (vertex != null && typeDefStore.isTypeVertex(vertex, TypeCategory.CLASS)) { - ret = new AtlasEntityDef(); - - AtlasStructDefStoreV1.toStructDef(vertex, ret, typeDefStore); - - ret.setSuperTypes(typeDefStore.getSuperTypeNames(vertex)); - } - - return ret; - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/4823c8ed/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java deleted file mode 100644 index e31ca4d..0000000 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java +++ /dev/null @@ -1,377 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.repository.store.graph.v1; - -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.TypeCategory; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasObjectId; -import org.apache.atlas.model.instance.AtlasStruct; -import org.apache.atlas.repository.store.graph.EntityGraphDiscovery; -import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; -import org.apache.atlas.repository.store.graph.EntityResolver; -import org.apache.atlas.type.AtlasArrayType; -import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType; -import org.apache.atlas.type.AtlasEntityType; -import org.apache.atlas.type.AtlasMapType; -import org.apache.atlas.type.AtlasStructType; -import org.apache.atlas.type.AtlasStructType.AtlasAttribute; -import org.apache.atlas.type.AtlasType; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.type.AtlasTypeUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - - -public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery { - private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityGraphDiscoveryV1.class); - - private final AtlasTypeRegistry typeRegistry; - private final EntityGraphDiscoveryContext discoveryContext; - - public AtlasEntityGraphDiscoveryV1(AtlasTypeRegistry typeRegistry, EntityStream entityStream) { - this.typeRegistry = typeRegistry; - this.discoveryContext = new EntityGraphDiscoveryContext(typeRegistry, entityStream); - } - - @Override - public void init() throws AtlasBaseException { - //Nothing to do - } - - @Override - public EntityGraphDiscoveryContext discoverEntities() throws AtlasBaseException { - // walk through entities in stream and validate them; record entity references - discover(); - - // resolve entity references discovered in previous step - resolveReferences(); - - return discoveryContext; - } - - @Override - public void validateAndNormalize(AtlasEntity entity) throws AtlasBaseException { - List<String> messages = new ArrayList<>(); - - if (! AtlasTypeUtil.isValidGuid(entity.getGuid())) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, "invalid guid " + entity.getGuid()); - } - - AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName()); - - if (type == null) { - throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName()); - } - - type.validateValue(entity, entity.getTypeName(), messages); - - if (!messages.isEmpty()) { - throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages); - } - - type.getNormalizedValue(entity); - } - - @Override - public void validateAndNormalizeForUpdate(AtlasEntity entity) throws AtlasBaseException { - List<String> messages = new ArrayList<>(); - - if (! AtlasTypeUtil.isValidGuid(entity.getGuid())) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, "invalid guid " + entity.getGuid()); - } - - AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName()); - - if (type == null) { - throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName()); - } - - type.validateValueForUpdate(entity, entity.getTypeName(), messages); - - if (!messages.isEmpty()) { - throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages); - } - - type.getNormalizedValueForUpdate(entity); - } - - @Override - public void cleanUp() throws AtlasBaseException { - discoveryContext.cleanUp(); - } - - - protected void discover() throws AtlasBaseException { - EntityStream entityStream = discoveryContext.getEntityStream(); - - Set<String> walkedEntities = new HashSet<>(); - - // walk through top-level entities and find entity references - while (entityStream.hasNext()) { - AtlasEntity entity = entityStream.next(); - - if (entity == null) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "found null entity"); - } - - walkEntityGraph(entity); - - walkedEntities.add(entity.getGuid()); - } - - // walk through entities referenced by other entities - // referencedGuids will be updated within this for() loop; avoid use of iterators - List<String> referencedGuids = discoveryContext.getReferencedGuids(); - for (int i = 0; i < referencedGuids.size(); i++) { - String guid = referencedGuids.get(i); - - if (walkedEntities.contains(guid)) { - continue; - } - - AtlasEntity entity = entityStream.getByGuid(guid); - - if (entity != null) { - walkEntityGraph(entity); - - walkedEntities.add(entity.getGuid()); - } - } - } - - protected void resolveReferences() throws AtlasBaseException { - EntityResolver[] entityResolvers = new EntityResolver[] { new IDBasedEntityResolver(typeRegistry), - new UniqAttrBasedEntityResolver(typeRegistry) - }; - - for (EntityResolver resolver : entityResolvers) { - resolver.resolveEntityReferences(discoveryContext); - } - } - - private void visitReference(AtlasObjectIdType type, Object val) throws AtlasBaseException { - if (type == null || val == null) { - return; - } - - if (val instanceof AtlasObjectId) { - AtlasObjectId objId = (AtlasObjectId)val; - - if (!AtlasTypeUtil.isValid(objId)) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, objId.toString()); - } - - recordObjectReference(objId); - } else if (val instanceof Map) { - AtlasObjectId objId = new AtlasObjectId((Map)val); - - if (!AtlasTypeUtil.isValid(objId)) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, objId.toString()); - } - - recordObjectReference(objId); - } else { - throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString()); - } - } - - void visitAttribute(AtlasType attrType, Object val) throws AtlasBaseException { - if (attrType == null || val == null) { - return; - } - - switch (attrType.getTypeCategory()) { - case PRIMITIVE: - case ENUM: - return; - - case ARRAY: { - AtlasArrayType arrayType = (AtlasArrayType) attrType; - AtlasType elemType = arrayType.getElementType(); - - visitCollectionReferences(elemType, val); - } - break; - - case MAP: { - AtlasType keyType = ((AtlasMapType) attrType).getKeyType(); - AtlasType valueType = ((AtlasMapType) attrType).getValueType(); - - visitMapReferences(keyType, valueType, val); - } - break; - - case STRUCT: - visitStruct((AtlasStructType)attrType, val); - break; - - case OBJECT_ID_TYPE: - visitReference((AtlasObjectIdType) attrType, val); - break; - - default: - throw new AtlasBaseException(AtlasErrorCode.TYPE_CATEGORY_INVALID, attrType.getTypeCategory().name()); - } - } - - void visitMapReferences(AtlasType keyType, AtlasType valueType, Object val) throws AtlasBaseException { - if (keyType == null || valueType == null || val == null) { - return; - } - - if (isPrimitive(keyType.getTypeCategory()) && isPrimitive(valueType.getTypeCategory())) { - return; - } - - if (Map.class.isAssignableFrom(val.getClass())) { - Iterator<Map.Entry> it = ((Map) val).entrySet().iterator(); - while (it.hasNext()) { - Map.Entry e = it.next(); - visitAttribute(keyType, e.getKey()); - visitAttribute(valueType, e.getValue()); - } - } - } - - void visitCollectionReferences(AtlasType elemType, Object val) throws AtlasBaseException { - if (elemType == null || val == null || isPrimitive(elemType.getTypeCategory())) { - return; - } - - Iterator it = null; - - if (val instanceof Collection) { - it = ((Collection) val).iterator(); - } else if (val instanceof Iterable) { - it = ((Iterable) val).iterator(); - } else if (val instanceof Iterator) { - it = (Iterator) val; - } - - if (it != null) { - while (it.hasNext()) { - Object elem = it.next(); - visitAttribute(elemType, elem); - } - } - } - - void visitStruct(AtlasStructType structType, Object val) throws AtlasBaseException { - if (structType == null || val == null) { - return; - } - - final AtlasStruct struct; - - if (val instanceof AtlasStruct) { - struct = (AtlasStruct) val; - } else if (val instanceof Map) { - Map attributes = AtlasTypeUtil.toStructAttributes((Map) val); - - struct = new AtlasStruct(structType.getTypeName(), attributes); - } else { - throw new AtlasBaseException(AtlasErrorCode.INVALID_STRUCT_VALUE, val.toString()); - } - - visitStruct(structType, struct); - } - - void visitEntity(AtlasEntityType entityType, AtlasEntity entity) throws AtlasBaseException { - List<String> visitedAttributes = new ArrayList<>(); - - // visit relationship attributes - visitRelationships(entityType, entity, visitedAttributes); - - // visit struct attributes - for (AtlasAttribute attribute : entityType.getAllAttributes().values()) { - AtlasType attrType = attribute.getAttributeType(); - String attrName = attribute.getName(); - Object attrVal = entity.getAttribute(attrName); - - if (entity.hasAttribute(attrName) && !visitedAttributes.contains(attrName)) { - visitAttribute(attrType, attrVal); - } - } - } - - private void visitRelationships(AtlasEntityType entityType, AtlasEntity entity, List<String> visitedAttributes) throws AtlasBaseException { - for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) { - AtlasType attrType = attribute.getAttributeType(); - String attrName = attribute.getName(); - Object attrVal = entity.getRelationshipAttribute(attrName); - - if (entity.hasRelationshipAttribute(attrName)) { - visitAttribute(attrType, attrVal); - - visitedAttributes.add(attrName); - } - } - } - - void visitStruct(AtlasStructType structType, AtlasStruct struct) throws AtlasBaseException { - for (AtlasAttribute attribute : structType.getAllAttributes().values()) { - AtlasType attrType = attribute.getAttributeType(); - Object attrVal = struct.getAttribute(attribute.getName()); - - visitAttribute(attrType, attrVal); - } - } - - void walkEntityGraph(AtlasEntity entity) throws AtlasBaseException { - if (entity == null) { - return; - } - - AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName()); - - if (type == null) { - throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName()); - } - - recordObjectReference(entity.getGuid()); - - visitEntity(type, entity); - } - - - boolean isPrimitive(TypeCategory typeCategory) { - return typeCategory == TypeCategory.PRIMITIVE || typeCategory == TypeCategory.ENUM; - } - - private void recordObjectReference(String guid) { - discoveryContext.addReferencedGuid(guid); - } - - private void recordObjectReference(AtlasObjectId objId) { - if (AtlasTypeUtil.isValidGuid(objId)) { - discoveryContext.addReferencedGuid(objId.getGuid()); - } else { - discoveryContext.addReferencedByUniqAttribs(objId); - } - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/4823c8ed/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java deleted file mode 100644 index 528065c..0000000 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ /dev/null @@ -1,781 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.repository.store.graph.v1; - - -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.GraphTransactionInterceptor; -import org.apache.atlas.RequestContextV1; -import org.apache.atlas.annotation.GraphTransaction; -import org.apache.atlas.authorize.AtlasEntityAccessRequest; -import org.apache.atlas.authorize.AtlasPrivilege; -import org.apache.atlas.authorize.AtlasAuthorizationUtils; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.instance.*; -import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; -import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; -import org.apache.atlas.repository.graphdb.AtlasVertex; -import org.apache.atlas.repository.store.graph.AtlasEntityStore; -import org.apache.atlas.repository.store.graph.EntityGraphDiscovery; -import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; -import org.apache.atlas.type.AtlasClassificationType; -import org.apache.atlas.type.AtlasEntityType; -import org.apache.atlas.type.AtlasStructType.AtlasAttribute; -import org.apache.atlas.type.AtlasType; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.type.AtlasTypeUtil; -import org.apache.atlas.utils.AtlasEntityUtil; -import org.apache.atlas.utils.AtlasPerfTracer; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - -import javax.inject.Inject; -import java.util.*; - -import static org.apache.atlas.AtlasConstants.ATLAS_MIGRATION_MODE_FILENAME; -import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE; -import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE; - - -@Component -public class AtlasEntityStoreV1 implements AtlasEntityStore { - private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class); - private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("store.EntityStore"); - - - private final DeleteHandlerV1 deleteHandler; - private final AtlasTypeRegistry typeRegistry; - private final AtlasEntityChangeNotifier entityChangeNotifier; - private final EntityGraphMapper entityGraphMapper; - private final EntityGraphRetriever entityRetriever; - - @Inject - public AtlasEntityStoreV1(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry, - AtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper) { - this.deleteHandler = deleteHandler; - this.typeRegistry = typeRegistry; - this.entityChangeNotifier = entityChangeNotifier; - this.entityGraphMapper = entityGraphMapper; - this.entityRetriever = new EntityGraphRetriever(typeRegistry); - } - - @Override - @GraphTransaction - public List<String> getEntityGUIDS(final String typename) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> getEntityGUIDS({})", typename); - } - - if (StringUtils.isEmpty(typename) || !typeRegistry.isRegisteredType(typename)) { - throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME); - } - - List<String> ret = AtlasGraphUtilsV1.findEntityGUIDsByType(typename); - - if (LOG.isDebugEnabled()) { - LOG.debug("<== getEntityGUIDS({})", typename); - } - - return ret; - } - - @Override - @GraphTransaction - public AtlasEntityWithExtInfo getById(String guid) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> getById({})", guid); - } - - AtlasEntityWithExtInfo ret = entityRetriever.toAtlasEntityWithExtInfo(guid); - - AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, new AtlasEntityHeader(ret.getEntity())), "read entity: guid=", guid); - - if (LOG.isDebugEnabled()) { - LOG.debug("<== getById({}): {}", guid, ret); - } - - return ret; - } - - @Override - @GraphTransaction - public AtlasEntitiesWithExtInfo getByIds(List<String> guids) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> getByIds({})", guids); - } - - AtlasEntitiesWithExtInfo ret = entityRetriever.toAtlasEntitiesWithExtInfo(guids); - - // verify authorization to read the entities - if(ret != null){ - for(String guid : guids){ - AtlasEntity entity = ret.getEntity(guid); - - AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, new AtlasEntityHeader(entity)), "read entity: guid=", guid); - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== getByIds({}): {}", guids, ret); - } - - return ret; - } - - @Override - @GraphTransaction - public AtlasEntityWithExtInfo getByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> getByUniqueAttribute({}, {})", entityType.getTypeName(), uniqAttributes); - } - - AtlasVertex entityVertex = AtlasGraphUtilsV1.getVertexByUniqueAttributes(entityType, uniqAttributes); - AtlasEntityWithExtInfo ret = entityRetriever.toAtlasEntityWithExtInfo(entityVertex); - - AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, new AtlasEntityHeader(ret.getEntity())), "read entity: typeName=", entityType.getTypeName(), ", uniqueAttributes=", uniqAttributes); - - if (LOG.isDebugEnabled()) { - LOG.debug("<== getByUniqueAttribute({}, {}): {}", entityType.getTypeName(), uniqAttributes, ret); - } - - return ret; - } - - @Override - @GraphTransaction - public EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate) throws AtlasBaseException { - return createOrUpdate(entityStream, isPartialUpdate, false); - } - - @Override - @GraphTransaction - public EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException { - return createOrUpdate(entityStream, false, true); - } - - @Override - @GraphTransaction - public EntityMutationResponse updateEntity(AtlasObjectId objectId, AtlasEntityWithExtInfo updatedEntityInfo, boolean isPartialUpdate) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> updateEntity({}, {}, {})", objectId, updatedEntityInfo, isPartialUpdate); - } - - if (objectId == null || updatedEntityInfo == null || updatedEntityInfo.getEntity() == null) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "null entity-id/entity"); - } - - final String guid; - - if (AtlasTypeUtil.isAssignedGuid(objectId.getGuid())) { - guid = objectId.getGuid(); - } else { - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(objectId.getTypeName()); - - if (entityType == null) { - throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, objectId.getTypeName()); - } - - guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(typeRegistry.getEntityTypeByName(objectId.getTypeName()), objectId.getUniqueAttributes()); - } - - AtlasEntity entity = updatedEntityInfo.getEntity(); - - entity.setGuid(guid); - - return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), isPartialUpdate, false); - } - - @Override - @GraphTransaction - public EntityMutationResponse updateByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes, - AtlasEntityWithExtInfo updatedEntityInfo) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> updateByUniqueAttributes({}, {})", entityType.getTypeName(), uniqAttributes); - } - - if (updatedEntityInfo == null || updatedEntityInfo.getEntity() == null) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entity to update."); - } - - String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, uniqAttributes); - AtlasEntity entity = updatedEntityInfo.getEntity(); - - entity.setGuid(guid); - - return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), true, false); - } - - @Override - @GraphTransaction - public EntityMutationResponse updateEntityAttributeByGuid(String guid, String attrName, Object attrValue) - throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> updateEntityAttributeByGuid({}, {}, {})", guid, attrName, attrValue); - } - - AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); - AtlasEntityType entityType = (AtlasEntityType) typeRegistry.getType(entity.getTypeName()); - AtlasAttribute attr = entityType.getAttribute(attrName); - - if (attr == null) { - throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_ATTRIBUTE, attrName, entity.getTypeName()); - } - - AtlasType attrType = attr.getAttributeType(); - AtlasEntity updateEntity = new AtlasEntity(); - - updateEntity.setGuid(guid); - updateEntity.setTypeName(entity.getTypeName()); - - switch (attrType.getTypeCategory()) { - case PRIMITIVE: - updateEntity.setAttribute(attrName, attrValue); - break; - case OBJECT_ID_TYPE: - AtlasObjectId objId; - - if (attrValue instanceof String) { - objId = new AtlasObjectId((String) attrValue, attr.getAttributeDef().getTypeName()); - } else { - objId = (AtlasObjectId) attrType.getNormalizedValue(attrValue); - } - - updateEntity.setAttribute(attrName, objId); - break; - - default: - throw new AtlasBaseException(AtlasErrorCode.ATTRIBUTE_UPDATE_NOT_SUPPORTED, attrName, attrType.getTypeName()); - } - - return createOrUpdate(new AtlasEntityStream(updateEntity), true, false); - } - - @Override - @GraphTransaction - public EntityMutationResponse deleteById(final String guid) throws AtlasBaseException { - if (StringUtils.isEmpty(guid)) { - throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); - } - - Collection<AtlasVertex> deletionCandidates = new ArrayList<>(); - AtlasVertex vertex = AtlasGraphUtilsV1.findByGuid(guid); - - if (vertex != null) { - AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(vertex); - - AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, entityHeader), "delete entity: guid=", guid); - - deletionCandidates.add(vertex); - } else { - if (LOG.isDebugEnabled()) { - // Entity does not exist - treat as non-error, since the caller - // wanted to delete the entity and it's already gone. - LOG.debug("Deletion request ignored for non-existent entity with guid " + guid); - } - } - - EntityMutationResponse ret = deleteVertices(deletionCandidates); - - // Notify the change listeners - entityChangeNotifier.onEntitiesMutated(ret, false); - - return ret; - } - - @Override - @GraphTransaction - public EntityMutationResponse deleteByIds(final List<String> guids) throws AtlasBaseException { - if (CollectionUtils.isEmpty(guids)) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified"); - } - - Collection<AtlasVertex> deletionCandidates = new ArrayList<>(); - - for (String guid : guids) { - AtlasVertex vertex = AtlasGraphUtilsV1.findByGuid(guid); - - if (vertex == null) { - if (LOG.isDebugEnabled()) { - // Entity does not exist - treat as non-error, since the caller - // wanted to delete the entity and it's already gone. - LOG.debug("Deletion request ignored for non-existent entity with guid " + guid); - } - - continue; - } - - AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(vertex); - - AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, entityHeader), "delete entity: guid=", guid); - - deletionCandidates.add(vertex); - } - - if (deletionCandidates.isEmpty()) { - LOG.info("No deletion candidate entities were found for guids %s", guids); - } - - EntityMutationResponse ret = deleteVertices(deletionCandidates); - - // Notify the change listeners - entityChangeNotifier.onEntitiesMutated(ret, false); - - return ret; - } - - @Override - @GraphTransaction - public EntityMutationResponse deleteByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException { - if (MapUtils.isEmpty(uniqAttributes)) { - throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, uniqAttributes.toString()); - } - - Collection<AtlasVertex> deletionCandidates = new ArrayList<>(); - AtlasVertex vertex = AtlasGraphUtilsV1.findByUniqueAttributes(entityType, uniqAttributes); - - if (vertex != null) { - AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(vertex); - - AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, entityHeader), "delete entity: typeName=", entityType.getTypeName(), ", uniqueAttributes=", uniqAttributes); - - deletionCandidates.add(vertex); - } else { - if (LOG.isDebugEnabled()) { - // Entity does not exist - treat as non-error, since the caller - // wanted to delete the entity and it's already gone. - LOG.debug("Deletion request ignored for non-existent entity with uniqueAttributes " + uniqAttributes); - } - } - - EntityMutationResponse ret = deleteVertices(deletionCandidates); - - // Notify the change listeners - entityChangeNotifier.onEntitiesMutated(ret, false); - - return ret; - } - - @Override - @GraphTransaction - public String getGuidByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException{ - return AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, uniqAttributes); - } - - @Override - @GraphTransaction - public void addClassifications(final String guid, final List<AtlasClassification> classifications) throws AtlasBaseException { - if (StringUtils.isEmpty(guid)) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified"); - } - - if (CollectionUtils.isEmpty(classifications)) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified"); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Adding classifications={} to entity={}", classifications, guid); - } - - AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); - - for (AtlasClassification classification : classifications) { - AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_ADD_CLASSIFICATION, entityHeader, classification), - "add classification: guid=", guid, ", classification=", classification.getTypeName()); - } - - GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); - for (AtlasClassification classification : classifications) { - validateAndNormalize(classification); - } - - // validate if entity, not already associated with classifications - validateEntityAssociations(guid, classifications); - - entityGraphMapper.addClassifications(new EntityMutationContext(), guid, classifications); - } - - @Override - @GraphTransaction - public void updateClassifications(String guid, List<AtlasClassification> classifications) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("Updating classifications={} for entity={}", classifications, guid); - } - - if (StringUtils.isEmpty(guid)) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid not specified"); - } - - if (CollectionUtils.isEmpty(classifications)) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified"); - } - - AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); - - for (AtlasClassification classification : classifications) { - AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE_CLASSIFICATION, entityHeader, classification), "update classification: guid=", guid, ", classification=", classification.getTypeName()); - } - - GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); - - entityGraphMapper.updateClassifications(new EntityMutationContext(), guid, classifications); - } - - @Override - @GraphTransaction - public void addClassification(final List<String> guids, final AtlasClassification classification) throws AtlasBaseException { - if (CollectionUtils.isEmpty(guids)) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified"); - } - if (classification == null) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classification not specified"); - } - - for (String guid : guids) { - AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); - - AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_ADD_CLASSIFICATION, entityHeader, classification), - "add classification: guid=", guid, ", classification=", classification.getTypeName()); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Adding classification={} to entities={}", classification, guids); - } - - GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guids); - - validateAndNormalize(classification); - - List<AtlasClassification> classifications = Collections.singletonList(classification); - - for (String guid : guids) { - validateEntityAssociations(guid, classifications); - - entityGraphMapper.addClassifications(new EntityMutationContext(), guid, classifications); - } - } - - @Override - @GraphTransaction - public void deleteClassifications(final String guid, final List<String> classificationNames) throws AtlasBaseException { - if (StringUtils.isEmpty(guid)) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified"); - } - if (CollectionUtils.isEmpty(classificationNames)) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified"); - } - - AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); - - for (String classification : classificationNames) { - AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_REMOVE_CLASSIFICATION, entityHeader, new AtlasClassification(classification)), "remove classification: guid=", guid, ", classification=", classification); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting classifications={} from entity={}", classificationNames, guid); - } - - GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid); - - entityGraphMapper.deleteClassifications(guid, classificationNames); - } - - - @GraphTransaction - public List<AtlasClassification> retrieveClassifications(String guid) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("Retriving classifications for entity={}", guid); - } - - AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); - - return entityHeader.getClassifications(); - } - - - @Override - @GraphTransaction - public List<AtlasClassification> getClassifications(String guid) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("Getting classifications for entity={}", guid); - } - - AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); - - AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ_CLASSIFICATION, entityHeader), "get classifications: guid=", guid); - - return entityHeader.getClassifications(); - } - - @Override - @GraphTransaction - public AtlasClassification getClassification(String guid, String classificationName) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("Getting classifications for entities={}", guid); - } - - AtlasClassification ret = null; - AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); - - if (CollectionUtils.isNotEmpty(entityHeader.getClassifications())) { - AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ_CLASSIFICATION, entityHeader), "get classification: guid=", guid, ", classification=", classificationName); - - for (AtlasClassification classification : entityHeader.getClassifications()) { - if (!StringUtils.equalsIgnoreCase(classification.getTypeName(), classificationName)) { - continue; - } - - if (StringUtils.isEmpty(classification.getEntityGuid()) || StringUtils.equalsIgnoreCase(classification.getEntityGuid(), guid)) { - ret = classification; - break; - } else if (ret == null) { - ret = classification; - } - } - } - - if (ret == null) { - throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName); - } - - return ret; - } - - private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> createOrUpdate()"); - } - - if (entityStream == null || !entityStream.hasNext()) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update."); - } - - AtlasPerfTracer perf = null; - - if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { - perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "createOrUpdate()"); - } - - try { - final boolean isImport = entityStream instanceof EntityImportStream; - final EntityMutationContext context = preCreateOrUpdate(entityStream, entityGraphMapper, isPartialUpdate); - - // Check if authorized to create entities - if (!isImport && CollectionUtils.isNotEmpty(context.getCreatedEntities())) { - for (AtlasEntity entity : context.getCreatedEntities()) { - AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, new AtlasEntityHeader(entity)), - "create entity: type=", entity.getTypeName()); - } - } - - // for existing entities, skip update if incoming entity doesn't have any change - if (CollectionUtils.isNotEmpty(context.getUpdatedEntities())) { - List<AtlasEntity> entitiesToSkipUpdate = null; - - for (AtlasEntity entity : context.getUpdatedEntities()) { - String guid = entity.getGuid(); - AtlasVertex vertex = context.getVertex(guid); - AtlasEntity entityInStore = entityRetriever.toAtlasEntity(vertex); - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); - - if (!AtlasEntityUtil.hasAnyAttributeUpdate(entityType, entity, entityInStore)) { - // if classifications are to be replaced as well, then skip updates only when no change in classifications as well - if (!replaceClassifications || Objects.equals(entity.getClassifications(), entityInStore.getClassifications())) { - if (entitiesToSkipUpdate == null) { - entitiesToSkipUpdate = new ArrayList<>(); - } - - entitiesToSkipUpdate.add(entity); - } - } - } - - if (entitiesToSkipUpdate != null) { - context.getUpdatedEntities().removeAll(entitiesToSkipUpdate); - } - - // Check if authorized to update entities - if (!isImport) { - for (AtlasEntity entity : context.getUpdatedEntities()) { - AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, new AtlasEntityHeader(entity)), - "update entity: type=", entity.getTypeName()); - } - } - } - - EntityMutationResponse ret = entityGraphMapper.mapAttributesAndClassifications(context, isPartialUpdate, replaceClassifications); - - ret.setGuidAssignments(context.getGuidAssignments()); - - // Notify the change listeners - entityChangeNotifier.onEntitiesMutated(ret, isImport); - - if (LOG.isDebugEnabled()) { - LOG.debug("<== createOrUpdate()"); - } - - return ret; - } finally { - AtlasPerfTracer.log(perf); - } - } - - private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws AtlasBaseException { - EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV1(typeRegistry, entityStream); - EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities(); - EntityMutationContext context = new EntityMutationContext(discoveryContext); - - for (String guid : discoveryContext.getReferencedGuids()) { - AtlasVertex vertex = discoveryContext.getResolvedEntityVertex(guid); - AtlasEntity entity = entityStream.getByGuid(guid); - - if (entity != null) { // entity would be null if guid is not in the stream but referenced by an entity in the stream - if (vertex != null) { - if (!isPartialUpdate) { - graphDiscoverer.validateAndNormalize(entity); - } else { - graphDiscoverer.validateAndNormalizeForUpdate(entity); - } - - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); - - String guidVertex = AtlasGraphUtilsV1.getIdFromVertex(vertex); - - if (!StringUtils.equals(guidVertex, guid)) { // if entity was found by unique attribute - entity.setGuid(guidVertex); - } - - context.addUpdated(guid, entity, entityType, vertex); - } else { - graphDiscoverer.validateAndNormalize(entity); - - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); - - - //Create vertices which do not exist in the repository - if ((entityStream instanceof EntityImportStream) && AtlasTypeUtil.isAssignedGuid(entity.getGuid())) { - vertex = entityGraphMapper.createVertexWithGuid(entity, entity.getGuid()); - } else { - vertex = entityGraphMapper.createVertex(entity); - } - - discoveryContext.addResolvedGuid(guid, vertex); - - String generatedGuid = AtlasGraphUtilsV1.getIdFromVertex(vertex); - - entity.setGuid(generatedGuid); - - context.addCreated(guid, entity, entityType, vertex); - } - - // during import, update the system attributes - if (entityStream instanceof EntityImportStream) { - entityGraphMapper.updateSystemAttributes(vertex, entity); - } - } - } - - return context; - } - - private EntityMutationResponse deleteVertices(Collection<AtlasVertex> deletionCandidates) throws AtlasBaseException { - EntityMutationResponse response = new EntityMutationResponse(); - RequestContextV1 req = RequestContextV1.get(); - - deleteHandler.deleteEntities(deletionCandidates); // this will update req with list of deleted/updated entities - - for (AtlasObjectId entity : req.getDeletedEntities()) { - response.addEntity(DELETE, entity); - } - - for (AtlasObjectId entity : req.getUpdatedEntities()) { - response.addEntity(UPDATE, entity); - } - - return response; - } - - private void validateAndNormalize(AtlasClassification classification) throws AtlasBaseException { - AtlasClassificationType type = typeRegistry.getClassificationTypeByName(classification.getTypeName()); - - if (type == null) { - throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classification.getTypeName()); - } - - List<String> messages = new ArrayList<>(); - - type.validateValue(classification, classification.getTypeName(), messages); - - if (!messages.isEmpty()) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, messages); - } - - type.getNormalizedValue(classification); - } - - /** - * Validate if classification is not already associated with the entities - * - * @param guid unique entity id - * @param classifications list of classifications to be associated - */ - private void validateEntityAssociations(String guid, List<AtlasClassification> classifications) throws AtlasBaseException { - List<String> entityClassifications = getClassificationNames(guid); - String entityTypeName = AtlasGraphUtilsV1.getTypeNameFromGuid(guid); - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName); - - for (AtlasClassification classification : classifications) { - String newClassification = classification.getTypeName(); - - if (CollectionUtils.isNotEmpty(entityClassifications) && entityClassifications.contains(newClassification)) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "entity: " + guid + - ", already associated with classification: " + newClassification); - } - - // for each classification, check whether there are entities it should be restricted to - AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(newClassification); - - if (!classificationType.canApplyToEntityType(entityType)) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_ENTITY_FOR_CLASSIFICATION, guid, entityTypeName, newClassification); - } - } - } - - private List<String> getClassificationNames(String guid) throws AtlasBaseException { - List<String> ret = null; - List<AtlasClassification> classifications = retrieveClassifications(guid); - - if (CollectionUtils.isNotEmpty(classifications)) { - ret = new ArrayList<>(); - - for (AtlasClassification classification : classifications) { - String entityGuid = classification.getEntityGuid(); - - if (StringUtils.isEmpty(entityGuid) || StringUtils.equalsIgnoreCase(guid, entityGuid)) { - ret.add(classification.getTypeName()); - } - } - } - - return ret; - } -}