http://git-wip-us.apache.org/repos/asf/atlas/blob/4823c8ed/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasClassificationDefStoreV2.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasClassificationDefStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasClassificationDefStoreV2.java new file mode 100644 index 0000000..4082fde --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasClassificationDefStoreV2.java @@ -0,0 +1,364 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2; + + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.authorize.AtlasPrivilege; +import org.apache.atlas.authorize.AtlasAuthorizationUtils; +import org.apache.atlas.authorize.AtlasTypeAccessRequest; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.typedef.AtlasClassificationDef; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasClassificationType; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.typesystem.types.DataTypes.TypeCategory; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * ClassificationDef store in v1 format. + */ +class AtlasClassificationDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasClassificationDef> { + private static final Logger LOG = LoggerFactory.getLogger(AtlasClassificationDefStoreV2.class); + + private static final String TRAIT_NAME_REGEX = "[a-zA-Z][a-zA-Z0-9_ .]*"; + + private static final Pattern TRAIT_NAME_PATTERN = Pattern.compile(TRAIT_NAME_REGEX); + + public AtlasClassificationDefStoreV2(AtlasTypeDefGraphStoreV2 typeDefStore, AtlasTypeRegistry typeRegistry) { + super(typeDefStore, typeRegistry); + } + + @Override + public AtlasVertex preCreate(AtlasClassificationDef classificationDef) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasClassificationDefStoreV1.preCreate({})", classificationDef); + } + + validateType(classificationDef); + + AtlasType type = typeRegistry.getType(classificationDef.getName()); + + if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.CLASSIFICATION) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, classificationDef.getName(), TypeCategory.TRAIT.name()); + } + + AtlasVertex ret = typeDefStore.findTypeVertexByName(classificationDef.getName()); + + if (ret != null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_ALREADY_EXISTS, classificationDef.getName()); + } + + ret = typeDefStore.createTypeVertex(classificationDef); + + updateVertexPreCreate(classificationDef, (AtlasClassificationType)type, ret); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasClassificationDefStoreV1.preCreate({}): {}", classificationDef, ret); + } + + return ret; + } + + @Override + public AtlasClassificationDef create(AtlasClassificationDef classificationDef, AtlasVertex preCreateResult) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasClassificationDefStoreV1.create({}, {})", classificationDef, preCreateResult); + } + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_CREATE, classificationDef), "create classification-def ", classificationDef.getName()); + + AtlasVertex vertex = (preCreateResult == null) ? preCreate(classificationDef) : preCreateResult; + + updateVertexAddReferences(classificationDef, vertex); + + AtlasClassificationDef ret = toClassificationDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasClassificationDefStoreV1.create({}, {}): {}", classificationDef, preCreateResult, ret); + } + + return ret; + } + + @Override + public List<AtlasClassificationDef> getAll() throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasClassificationDefStoreV1.getAll()"); + } + + List<AtlasClassificationDef> ret = new ArrayList<>(); + + Iterator<AtlasVertex> vertices = typeDefStore.findTypeVerticesByCategory(TypeCategory.TRAIT); + while (vertices.hasNext()) { + ret.add(toClassificationDef(vertices.next())); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasClassificationDefStoreV1.getAll(): count={}", ret.size()); + } + return ret; + } + + @Override + public AtlasClassificationDef getByName(String name) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasClassificationDefStoreV1.getByName({})", name); + } + + AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.TRAIT); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name); + } + + vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY, TypeCategory.class); + + AtlasClassificationDef ret = toClassificationDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasClassificationDefStoreV1.getByName({}): {}", name, ret); + } + + return ret; + } + + @Override + public AtlasClassificationDef getByGuid(String guid) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasClassificationDefStoreV1.getByGuid({})", guid); + } + + AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.TRAIT); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid); + } + + AtlasClassificationDef ret = toClassificationDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasClassificationDefStoreV1.getByGuid({}): {}", guid, ret); + } + + return ret; + } + + @Override + public AtlasClassificationDef update(AtlasClassificationDef classifiDef) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasClassificationDefStoreV1.update({})", classifiDef); + } + + validateType(classifiDef); + + AtlasClassificationDef ret = StringUtils.isNotBlank(classifiDef.getGuid()) + ? updateByGuid(classifiDef.getGuid(), classifiDef) : updateByName(classifiDef.getName(), classifiDef); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasClassificationDefStoreV1.update({}): {}", classifiDef, ret); + } + + return ret; + } + + @Override + public AtlasClassificationDef updateByName(String name, AtlasClassificationDef classificationDef) + throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasClassificationDefStoreV1.updateByName({}, {})", name, classificationDef); + } + + AtlasClassificationDef existingDef = typeRegistry.getClassificationDefByName(name); + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update classification-def ", name); + + validateType(classificationDef); + + AtlasType type = typeRegistry.getType(classificationDef.getName()); + + if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.CLASSIFICATION) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, classificationDef.getName(), TypeCategory.TRAIT.name()); + } + + AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.TRAIT); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name); + } + + updateVertexPreUpdate(classificationDef, (AtlasClassificationType)type, vertex); + updateVertexAddReferences(classificationDef, vertex); + + AtlasClassificationDef ret = toClassificationDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasClassificationDefStoreV1.updateByName({}, {}): {}", name, classificationDef, ret); + } + + return ret; + } + + @Override + public AtlasClassificationDef updateByGuid(String guid, AtlasClassificationDef classificationDef) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasClassificationDefStoreV1.updateByGuid({})", guid); + } + + AtlasClassificationDef existingDef = typeRegistry.getClassificationDefByGuid(guid); + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update classification-def ", (existingDef != null ? existingDef.getName() : guid)); + + validateType(classificationDef); + + AtlasType type = typeRegistry.getTypeByGuid(guid); + + if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.CLASSIFICATION) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, classificationDef.getName(), TypeCategory.TRAIT.name()); + } + + AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.TRAIT); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid); + } + + updateVertexPreUpdate(classificationDef, (AtlasClassificationType)type, vertex); + updateVertexAddReferences(classificationDef, vertex); + + AtlasClassificationDef ret = toClassificationDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasClassificationDefStoreV1.updateByGuid({}): {}", guid, ret); + } + + return ret; + } + + @Override + public AtlasVertex preDeleteByName(String name) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasClassificationDefStoreV1.preDeleteByName({})", name); + } + + AtlasClassificationDef existingDef = typeRegistry.getClassificationDefByName(name); + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete classification-def ", name); + + AtlasVertex ret = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.TRAIT); + + if (AtlasGraphUtilsV2.typeHasInstanceVertex(name)) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, name); + } + + if (ret == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name); + } + + typeDefStore.deleteTypeVertexOutEdges(ret); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasClassificationDefStoreV1.preDeleteByName({}): ret=", name, ret); + } + + return ret; + } + + @Override + public AtlasVertex preDeleteByGuid(String guid) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasClassificationDefStoreV1.preDeleteByGuid({})", guid); + } + + AtlasClassificationDef existingDef = typeRegistry.getClassificationDefByGuid(guid); + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete classification-def ", (existingDef != null ? existingDef.getName() : guid)); + + AtlasVertex ret = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.TRAIT); + + String typeName = AtlasGraphUtilsV2.getProperty(ret, Constants.TYPENAME_PROPERTY_KEY, String.class); + + if (AtlasGraphUtilsV2.typeHasInstanceVertex(typeName)) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, typeName); + } + + if (ret == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid); + } + + typeDefStore.deleteTypeVertexOutEdges(ret); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasClassificationDefStoreV1.preDeleteByGuid({}): ret=", guid, ret); + } + + return ret; + } + + private void updateVertexPreCreate(AtlasClassificationDef classificationDef, + AtlasClassificationType classificationType, + AtlasVertex vertex) throws AtlasBaseException { + AtlasStructDefStoreV2.updateVertexPreCreate(classificationDef, classificationType, vertex, typeDefStore); + } + + private void updateVertexPreUpdate(AtlasClassificationDef classificationDef, + AtlasClassificationType classificationType, + AtlasVertex vertex) throws AtlasBaseException { + AtlasStructDefStoreV2.updateVertexPreUpdate(classificationDef, classificationType, vertex, typeDefStore); + } + + private void updateVertexAddReferences(AtlasClassificationDef classificationDef, AtlasVertex vertex) throws AtlasBaseException { + AtlasStructDefStoreV2.updateVertexAddReferences(classificationDef, vertex, typeDefStore); + + typeDefStore.createSuperTypeEdges(vertex, classificationDef.getSuperTypes(), TypeCategory.TRAIT); + // create edges from this vertex to entity Type vertices with the supplied entity type names + typeDefStore.createEntityTypeEdges(vertex, classificationDef.getEntityTypes()); + } + + private AtlasClassificationDef toClassificationDef(AtlasVertex vertex) throws AtlasBaseException { + AtlasClassificationDef ret = null; + + if (vertex != null && typeDefStore.isTypeVertex(vertex, TypeCategory.TRAIT)) { + ret = new AtlasClassificationDef(); + + AtlasStructDefStoreV2.toStructDef(vertex, ret, typeDefStore); + + ret.setSuperTypes(typeDefStore.getSuperTypeNames(vertex)); + ret.setEntityTypes(typeDefStore.getEntityTypeNames(vertex)); + } + + return ret; + } + + @Override + public boolean isValidName(String typeName) { + Matcher m = TRAIT_NAME_PATTERN.matcher(typeName); + + return m.matches(); + } +}
http://git-wip-us.apache.org/repos/asf/atlas/blob/4823c8ed/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java new file mode 100644 index 0000000..7ed99a4 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java @@ -0,0 +1,478 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2; + + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasException; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.listener.EntityChangeListener; +import org.apache.atlas.listener.EntityChangeListenerV2; +import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2; +import org.apache.atlas.model.glossary.AtlasGlossaryTerm; +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntity; + +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.AtlasRelatedObjectId; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.instance.EntityMutations.EntityOperation; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.atlas.v1.model.instance.Struct; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.converters.AtlasInstanceConverter; +import org.apache.atlas.repository.graph.FullTextMapperV2; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.util.AtlasRepositoryConfiguration; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE; +import static org.apache.atlas.util.AtlasRepositoryConfiguration.isV2EntityNotificationEnabled; + + +@Component +public class AtlasEntityChangeNotifier { + private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityChangeNotifier.class); + + private final Set<EntityChangeListener> entityChangeListeners; + private final Set<EntityChangeListenerV2> entityChangeListenersV2; + private final AtlasInstanceConverter instanceConverter; + private final FullTextMapperV2 fullTextMapperV2; + private final AtlasTypeRegistry atlasTypeRegistry; + + + @Inject + public AtlasEntityChangeNotifier(Set<EntityChangeListener> entityChangeListeners, + Set<EntityChangeListenerV2> entityChangeListenersV2, + AtlasInstanceConverter instanceConverter, + FullTextMapperV2 fullTextMapperV2, + AtlasTypeRegistry atlasTypeRegistry) { + this.entityChangeListeners = entityChangeListeners; + this.entityChangeListenersV2 = entityChangeListenersV2; + this.instanceConverter = instanceConverter; + this.fullTextMapperV2 = fullTextMapperV2; + this.atlasTypeRegistry = atlasTypeRegistry; + } + + public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException { + if (CollectionUtils.isEmpty(entityChangeListeners) || instanceConverter == null) { + return; + } + + List<AtlasEntityHeader> createdEntities = entityMutationResponse.getCreatedEntities(); + List<AtlasEntityHeader> updatedEntities = entityMutationResponse.getUpdatedEntities(); + List<AtlasEntityHeader> partiallyUpdatedEntities = entityMutationResponse.getPartialUpdatedEntities(); + List<AtlasEntityHeader> deletedEntities = entityMutationResponse.getDeletedEntities(); + + // complete full text mapping before calling toReferenceables(), from notifyListners(), to + // include all vertex updates in the current graph-transaction + doFullTextMapping(createdEntities); + doFullTextMapping(updatedEntities); + doFullTextMapping(partiallyUpdatedEntities); + + notifyListeners(createdEntities, EntityOperation.CREATE, isImport); + notifyListeners(updatedEntities, EntityOperation.UPDATE, isImport); + notifyListeners(partiallyUpdatedEntities, EntityOperation.PARTIAL_UPDATE, isImport); + notifyListeners(deletedEntities, EntityOperation.DELETE, isImport); + + notifyPropagatedEntities(); + } + + public void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException { + if (isV2EntityNotificationEnabled()) { + doFullTextMapping(entity.getGuid()); + + for (EntityChangeListenerV2 listener : entityChangeListenersV2) { + listener.onClassificationsAdded(entity, addedClassifications); + } + } else { + updateFullTextMapping(entity.getGuid(), addedClassifications); + + Referenceable entityRef = toReferenceable(entity.getGuid()); + List<Struct> traits = toStruct(addedClassifications); + + if (entity == null || CollectionUtils.isEmpty(traits)) { + return; + } + + for (EntityChangeListener listener : entityChangeListeners) { + try { + listener.onTraitsAdded(entityRef, traits); + } catch (AtlasException e) { + throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitAdd"); + } + } + } + } + + public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException { + if (isV2EntityNotificationEnabled()) { + doFullTextMapping(entity.getGuid()); + + for (EntityChangeListenerV2 listener : entityChangeListenersV2) { + listener.onClassificationsUpdated(entity, updatedClassifications); + } + } else { + doFullTextMapping(entity.getGuid()); + + Referenceable entityRef = toReferenceable(entity.getGuid()); + List<Struct> traits = toStruct(updatedClassifications); + + if (entityRef == null || CollectionUtils.isEmpty(traits)) { + return; + } + + for (EntityChangeListener listener : entityChangeListeners) { + try { + listener.onTraitsUpdated(entityRef, traits); + } catch (AtlasException e) { + throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitUpdate"); + } + } + } + } + + public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException { + if (isV2EntityNotificationEnabled()) { + doFullTextMapping(entity.getGuid()); + + for (EntityChangeListenerV2 listener : entityChangeListenersV2) { + listener.onClassificationsDeleted(entity, deletedClassifications); + } + } else { + doFullTextMapping(entity.getGuid()); + + Referenceable entityRef = toReferenceable(entity.getGuid()); + List<Struct> traits = toStruct(deletedClassifications); + + if (entityRef == null || CollectionUtils.isEmpty(deletedClassifications)) { + return; + } + + for (EntityChangeListener listener : entityChangeListeners) { + try { + listener.onTraitsDeleted(entityRef, traits); + } catch (AtlasException e) { + throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitDelete"); + } + } + + } + } + + public void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException { + // listeners notified on term-entity association only if v2 notifications are enabled + if (isV2EntityNotificationEnabled()) { + for (EntityChangeListenerV2 listener : entityChangeListenersV2) { + listener.onTermAdded(term, entityIds); + } + } else { + List<Referenceable> entityRefs = toReferenceables(entityIds); + + for (EntityChangeListener listener : entityChangeListeners) { + try { + listener.onTermAdded(entityRefs, term); + } catch (AtlasException e) { + throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TermAdd"); + } + } + } + } + + public void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException { + // listeners notified on term-entity disassociation only if v2 notifications are enabled + if (isV2EntityNotificationEnabled()) { + for (EntityChangeListenerV2 listener : entityChangeListenersV2) { + listener.onTermDeleted(term, entityIds); + } + } else { + List<Referenceable> entityRefs = toReferenceables(entityIds); + + for (EntityChangeListener listener : entityChangeListeners) { + try { + listener.onTermDeleted(entityRefs, term); + } catch (AtlasException e) { + throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TermDelete"); + } + } + } + } + + public void notifyPropagatedEntities() throws AtlasBaseException { + RequestContext context = RequestContext.get(); + Map<String, List<AtlasClassification>> addedPropagations = context.getAddedPropagations(); + Map<String, List<AtlasClassification>> removedPropagations = context.getRemovedPropagations(); + + notifyPropagatedEntities(addedPropagations, PROPAGATED_CLASSIFICATION_ADD); + notifyPropagatedEntities(removedPropagations, PROPAGATED_CLASSIFICATION_DELETE); + } + + private void notifyPropagatedEntities(Map<String, List<AtlasClassification>> entityPropagationMap, EntityAuditActionV2 action) throws AtlasBaseException { + if (MapUtils.isEmpty(entityPropagationMap) || action == null) { + return; + } + + for (String guid : entityPropagationMap.keySet()) { + AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(guid); + AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; + + if (entity == null) { + continue; + } + + if (action == PROPAGATED_CLASSIFICATION_ADD) { + onClassificationAddedToEntity(entity, entityPropagationMap.get(guid)); + } else if (action == PROPAGATED_CLASSIFICATION_DELETE) { + onClassificationDeletedFromEntity(entity, entityPropagationMap.get(guid)); + } + } + } + + private String getListenerName(EntityChangeListener listener) { + return listener.getClass().getSimpleName(); + } + + private void notifyListeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException { + if (CollectionUtils.isEmpty(entityHeaders)) { + return; + } + + if (isV2EntityNotificationEnabled()) { + notifyV2Listeners(entityHeaders, operation, isImport); + } else { + notifyV1Listeners(entityHeaders, operation, isImport); + } + } + + private void notifyV1Listeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException { + List<Referenceable> typedRefInsts = toReferenceables(entityHeaders, operation); + + for (EntityChangeListener listener : entityChangeListeners) { + try { + switch (operation) { + case CREATE: + listener.onEntitiesAdded(typedRefInsts, isImport); + break; + case UPDATE: + case PARTIAL_UPDATE: + listener.onEntitiesUpdated(typedRefInsts, isImport); + break; + case DELETE: + listener.onEntitiesDeleted(typedRefInsts, isImport); + break; + } + } catch (AtlasException e) { + throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), operation.toString()); + } + } + } + + private void notifyV2Listeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException { + List<AtlasEntity> entities = toAtlasEntities(entityHeaders); + + for (EntityChangeListenerV2 listener : entityChangeListenersV2) { + switch (operation) { + case CREATE: + listener.onEntitiesAdded(entities, isImport); + break; + case UPDATE: + case PARTIAL_UPDATE: + listener.onEntitiesUpdated(entities, isImport); + break; + case DELETE: + listener.onEntitiesDeleted(entities, isImport); + break; + } + } + } + + private List<Referenceable> toReferenceables(List<AtlasEntityHeader> entityHeaders, EntityOperation operation) throws AtlasBaseException { + List<Referenceable> ret = new ArrayList<>(entityHeaders.size()); + + // delete notifications don't need all attributes. Hence the special handling for delete operation + if (operation == EntityOperation.DELETE) { + for (AtlasEntityHeader entityHeader : entityHeaders) { + ret.add(new Referenceable(entityHeader.getGuid(), entityHeader.getTypeName(), entityHeader.getAttributes())); + } + } else { + for (AtlasEntityHeader entityHeader : entityHeaders) { + ret.add(toReferenceable(entityHeader.getGuid())); + } + } + + return ret; + } + + private List<Referenceable> toReferenceables(List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException { + List<Referenceable> ret = new ArrayList<>(); + + if (CollectionUtils.isNotEmpty(entityIds)) { + for (AtlasRelatedObjectId relatedObjectId : entityIds) { + String entityGuid = relatedObjectId.getGuid(); + + ret.add(toReferenceable(entityGuid)); + } + } + + return ret; + } + + private Referenceable toReferenceable(String entityId) throws AtlasBaseException { + Referenceable ret = null; + + if (StringUtils.isNotEmpty(entityId)) { + ret = instanceConverter.getReferenceable(entityId); + } + + return ret; + } + + private List<Struct> toStruct(List<AtlasClassification> classifications) throws AtlasBaseException { + List<Struct> ret = null; + + if (classifications != null) { + ret = new ArrayList<>(classifications.size()); + + for (AtlasClassification classification : classifications) { + if (classification != null) { + ret.add(instanceConverter.getTrait(classification)); + } + } + } + + return ret; + } + + private List<AtlasEntity> toAtlasEntities(List<AtlasEntityHeader> entityHeaders) throws AtlasBaseException { + List<AtlasEntity> ret = new ArrayList<>(); + + if (CollectionUtils.isNotEmpty(entityHeaders)) { + for (AtlasEntityHeader entityHeader : entityHeaders) { + String entityGuid = entityHeader.getGuid(); + String typeName = entityHeader.getTypeName(); + + // Skip all internal types as the HARD DELETE will cause lookup errors + AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName(typeName); + if (Objects.nonNull(entityType) && entityType.isInternalType()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping internal type = {}", typeName); + } + continue; + } + + AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid); + + if (entityWithExtInfo != null) { + ret.add(entityWithExtInfo.getEntity()); + } + } + } + + return ret; + } + + private void doFullTextMapping(List<AtlasEntityHeader> entityHeaders) { + if (CollectionUtils.isEmpty(entityHeaders)) { + return; + } + + try { + if(!AtlasRepositoryConfiguration.isFullTextSearchEnabled()) { + return; + } + } catch (AtlasException e) { + LOG.warn("Unable to determine if FullText is disabled. Proceeding with FullText mapping"); + } + + for (AtlasEntityHeader entityHeader : entityHeaders) { + if(GraphHelper.isInternalType(entityHeader.getTypeName())) { + continue; + } + + String guid = entityHeader.getGuid(); + AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(guid); + + if(vertex == null) { + continue; + } + + try { + String fullText = fullTextMapperV2.getIndexTextForEntity(guid); + + GraphHelper.setProperty(vertex, Constants.ENTITY_TEXT_PROPERTY_KEY, fullText); + } catch (AtlasBaseException e) { + LOG.error("FullText mapping failed for Vertex[ guid = {} ]", guid, e); + } + } + } + + private void updateFullTextMapping(String entityId, List<AtlasClassification> classifications) { + try { + if(!AtlasRepositoryConfiguration.isFullTextSearchEnabled()) { + return; + } + } catch (AtlasException e) { + LOG.warn("Unable to determine if FullText is disabled. Proceeding with FullText mapping"); + } + + if (StringUtils.isEmpty(entityId) || CollectionUtils.isEmpty(classifications)) { + return; + } + + AtlasVertex atlasVertex = AtlasGraphUtilsV2.findByGuid(entityId); + if(atlasVertex == null || GraphHelper.isInternalType(atlasVertex)) { + return; + } + + try { + String classificationFullText = fullTextMapperV2.getIndexTextForClassifications(entityId, classifications); + String existingFullText = (String) GraphHelper.getProperty(atlasVertex, Constants.ENTITY_TEXT_PROPERTY_KEY); + + String newFullText = existingFullText + " " + classificationFullText; + GraphHelper.setProperty(atlasVertex, Constants.ENTITY_TEXT_PROPERTY_KEY, newFullText); + } catch (AtlasBaseException e) { + LOG.error("FullText mapping failed for Vertex[ guid = {} ]", entityId, e); + } + } + + private void doFullTextMapping(String guid) { + AtlasEntityHeader entityHeader = new AtlasEntityHeader(); + entityHeader.setGuid(guid); + + doFullTextMapping(Collections.singletonList(entityHeader)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/4823c8ed/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityDefStoreV2.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityDefStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityDefStoreV2.java new file mode 100644 index 0000000..f427136 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityDefStoreV2.java @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.authorize.AtlasPrivilege; +import org.apache.atlas.authorize.AtlasTypeAccessRequest; +import org.apache.atlas.authorize.AtlasAuthorizationUtils; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.typesystem.types.DataTypes.TypeCategory; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * EntityDef store in v1 format. + */ +public class AtlasEntityDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasEntityDef> { + private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityDefStoreV2.class); + + @Inject + public AtlasEntityDefStoreV2(AtlasTypeDefGraphStoreV2 typeDefStore, AtlasTypeRegistry typeRegistry) { + super(typeDefStore, typeRegistry); + } + + @Override + public AtlasVertex preCreate(AtlasEntityDef entityDef) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasEntityDefStoreV1.preCreate({})", entityDef); + } + + validateType(entityDef); + + AtlasType type = typeRegistry.getType(entityDef.getName()); + + if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.ENTITY) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, entityDef.getName(), TypeCategory.CLASS.name()); + } + + + + AtlasVertex ret = typeDefStore.findTypeVertexByName(entityDef.getName()); + + if (ret != null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_ALREADY_EXISTS, entityDef.getName()); + } + + ret = typeDefStore.createTypeVertex(entityDef); + + updateVertexPreCreate(entityDef, (AtlasEntityType)type, ret); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasEntityDefStoreV1.preCreate({}): {}", entityDef, ret); + } + + return ret; + } + + @Override + public AtlasEntityDef create(AtlasEntityDef entityDef, AtlasVertex preCreateResult) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasEntityDefStoreV1.create({}, {})", entityDef, preCreateResult); + } + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_CREATE, entityDef), "create entity-def ", entityDef.getName()); + + AtlasVertex vertex = (preCreateResult == null) ? preCreate(entityDef) : preCreateResult; + + updateVertexAddReferences(entityDef, vertex); + + AtlasEntityDef ret = toEntityDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasEntityDefStoreV1.create({}, {}): {}", entityDef, preCreateResult, ret); + } + + return ret; + } + + @Override + public List<AtlasEntityDef> getAll() throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasEntityDefStoreV1.getAll()"); + } + + List<AtlasEntityDef> ret = new ArrayList<>(); + Iterator<AtlasVertex> vertices = typeDefStore.findTypeVerticesByCategory(TypeCategory.CLASS); + + while (vertices.hasNext()) { + ret.add(toEntityDef(vertices.next())); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasEntityDefStoreV1.getAll(): count={}", ret.size()); + } + + return ret; + } + + @Override + public AtlasEntityDef getByName(String name) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasEntityDefStoreV1.getByName({})", name); + } + + AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.CLASS); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name); + } + + vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY, TypeCategory.class); + + AtlasEntityDef ret = toEntityDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasEntityDefStoreV1.getByName({}): {}", name, ret); + } + + return ret; + } + + @Override + public AtlasEntityDef getByGuid(String guid) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasEntityDefStoreV1.getByGuid({})", guid); + } + + AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.CLASS); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid); + } + + AtlasEntityDef ret = toEntityDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasEntityDefStoreV1.getByGuid({}): {}", guid, ret); + } + + return ret; + } + + @Override + public AtlasEntityDef update(AtlasEntityDef entityDef) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasEntityDefStoreV1.update({})", entityDef); + } + + validateType(entityDef); + + AtlasEntityDef ret = StringUtils.isNotBlank(entityDef.getGuid()) ? updateByGuid(entityDef.getGuid(), entityDef) + : updateByName(entityDef.getName(), entityDef); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasEntityDefStoreV1.update({}): {}", entityDef, ret); + } + + return ret; + } + + @Override + public AtlasEntityDef updateByName(String name, AtlasEntityDef entityDef) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasEntityDefStoreV1.updateByName({}, {})", name, entityDef); + } + + AtlasEntityDef existingDef = typeRegistry.getEntityDefByName(name); + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update entity-def ", name); + + validateType(entityDef); + + AtlasType type = typeRegistry.getType(entityDef.getName()); + + if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.ENTITY) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, entityDef.getName(), TypeCategory.CLASS.name()); + } + + AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.CLASS); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name); + } + + updateVertexPreUpdate(entityDef, (AtlasEntityType)type, vertex); + updateVertexAddReferences(entityDef, vertex); + + AtlasEntityDef ret = toEntityDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasEntityDefStoreV1.updateByName({}, {}): {}", name, entityDef, ret); + } + + return ret; + } + + @Override + public AtlasEntityDef updateByGuid(String guid, AtlasEntityDef entityDef) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasEntityDefStoreV1.updateByGuid({})", guid); + } + + AtlasEntityDef existingDef = typeRegistry.getEntityDefByGuid(guid); + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update entity-def ", (existingDef != null ? existingDef.getName() : guid)); + + validateType(entityDef); + + AtlasType type = typeRegistry.getTypeByGuid(guid); + + if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.ENTITY) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, entityDef.getName(), TypeCategory.CLASS.name()); + } + + AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.CLASS); + + if (vertex == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid); + } + + updateVertexPreUpdate(entityDef, (AtlasEntityType)type, vertex); + updateVertexAddReferences(entityDef, vertex); + + AtlasEntityDef ret = toEntityDef(vertex); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasEntityDefStoreV1.updateByGuid({}): {}", guid, ret); + } + + return ret; + } + + @Override + public AtlasVertex preDeleteByName(String name) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasEntityDefStoreV1.preDeleteByName({})", name); + } + + AtlasEntityDef existingDef = typeRegistry.getEntityDefByName(name); + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete entity-def ", name); + + AtlasVertex ret = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.CLASS); + + if (AtlasGraphUtilsV2.typeHasInstanceVertex(name)) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, name); + } + + if (ret == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name); + } + + // error if we are trying to delete an entityDef that has a relationshipDef + if (typeDefStore.hasIncomingEdgesWithLabel(ret, AtlasGraphUtilsV2.RELATIONSHIPTYPE_EDGE_LABEL)){ + throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_RELATIONSHIPS, name); + } + + typeDefStore.deleteTypeVertexOutEdges(ret); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasEntityDefStoreV1.preDeleteByName({}): {}", name, ret); + } + + return ret; + } + + @Override + public AtlasVertex preDeleteByGuid(String guid) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AtlasEntityDefStoreV1.preDeleteByGuid({})", guid); + } + + AtlasEntityDef existingDef = typeRegistry.getEntityDefByGuid(guid); + + AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete entity-def ", (existingDef != null ? existingDef.getName() : guid)); + + AtlasVertex ret = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.CLASS); + + String typeName = AtlasGraphUtilsV2.getProperty(ret, Constants.TYPENAME_PROPERTY_KEY, String.class); + + if (AtlasGraphUtilsV2.typeHasInstanceVertex(typeName)) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, typeName); + } + + if (ret == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid); + } + + // error if we are trying to delete an entityDef that has a relationshipDef + if (typeDefStore.hasIncomingEdgesWithLabel(ret, AtlasGraphUtilsV2.RELATIONSHIPTYPE_EDGE_LABEL)){ + throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_RELATIONSHIPS, typeName); + } + + typeDefStore.deleteTypeVertexOutEdges(ret); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AtlasEntityDefStoreV1.preDeleteByGuid({}): {}", guid, ret); + } + + return ret; + } + + private void updateVertexPreCreate(AtlasEntityDef entityDef, AtlasEntityType entityType, AtlasVertex vertex) throws AtlasBaseException { + AtlasStructDefStoreV2.updateVertexPreCreate(entityDef, entityType, vertex, typeDefStore); + } + + private void updateVertexPreUpdate(AtlasEntityDef entityDef, AtlasEntityType entityType, AtlasVertex vertex) + throws AtlasBaseException { + AtlasStructDefStoreV2.updateVertexPreUpdate(entityDef, entityType, vertex, typeDefStore); + } + + private void updateVertexAddReferences(AtlasEntityDef entityDef, AtlasVertex vertex) throws AtlasBaseException { + AtlasStructDefStoreV2.updateVertexAddReferences(entityDef, vertex, typeDefStore); + + typeDefStore.createSuperTypeEdges(vertex, entityDef.getSuperTypes(), TypeCategory.CLASS); + } + + private AtlasEntityDef toEntityDef(AtlasVertex vertex) throws AtlasBaseException { + AtlasEntityDef ret = null; + + if (vertex != null && typeDefStore.isTypeVertex(vertex, TypeCategory.CLASS)) { + ret = new AtlasEntityDef(); + + AtlasStructDefStoreV2.toStructDef(vertex, ret, typeDefStore); + + ret.setSuperTypes(typeDefStore.getSuperTypeNames(vertex)); + } + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/4823c8ed/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java new file mode 100644 index 0000000..6580bee --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.TypeCategory; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.repository.store.graph.EntityGraphDiscovery; +import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; +import org.apache.atlas.repository.store.graph.EntityResolver; +import org.apache.atlas.type.AtlasArrayType; +import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasMapType; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasStructType.AtlasAttribute; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.type.AtlasTypeUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + + +public class AtlasEntityGraphDiscoveryV2 implements EntityGraphDiscovery { + private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityGraphDiscoveryV2.class); + + private final AtlasTypeRegistry typeRegistry; + private final EntityGraphDiscoveryContext discoveryContext; + + public AtlasEntityGraphDiscoveryV2(AtlasTypeRegistry typeRegistry, EntityStream entityStream) { + this.typeRegistry = typeRegistry; + this.discoveryContext = new EntityGraphDiscoveryContext(typeRegistry, entityStream); + } + + @Override + public void init() throws AtlasBaseException { + //Nothing to do + } + + @Override + public EntityGraphDiscoveryContext discoverEntities() throws AtlasBaseException { + // walk through entities in stream and validate them; record entity references + discover(); + + // resolve entity references discovered in previous step + resolveReferences(); + + return discoveryContext; + } + + @Override + public void validateAndNormalize(AtlasEntity entity) throws AtlasBaseException { + List<String> messages = new ArrayList<>(); + + if (! AtlasTypeUtil.isValidGuid(entity.getGuid())) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, "invalid guid " + entity.getGuid()); + } + + AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName()); + + if (type == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName()); + } + + type.validateValue(entity, entity.getTypeName(), messages); + + if (!messages.isEmpty()) { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages); + } + + type.getNormalizedValue(entity); + } + + @Override + public void validateAndNormalizeForUpdate(AtlasEntity entity) throws AtlasBaseException { + List<String> messages = new ArrayList<>(); + + if (! AtlasTypeUtil.isValidGuid(entity.getGuid())) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, "invalid guid " + entity.getGuid()); + } + + AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName()); + + if (type == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName()); + } + + type.validateValueForUpdate(entity, entity.getTypeName(), messages); + + if (!messages.isEmpty()) { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages); + } + + type.getNormalizedValueForUpdate(entity); + } + + @Override + public void cleanUp() throws AtlasBaseException { + discoveryContext.cleanUp(); + } + + + protected void discover() throws AtlasBaseException { + EntityStream entityStream = discoveryContext.getEntityStream(); + + Set<String> walkedEntities = new HashSet<>(); + + // walk through top-level entities and find entity references + while (entityStream.hasNext()) { + AtlasEntity entity = entityStream.next(); + + if (entity == null) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "found null entity"); + } + + walkEntityGraph(entity); + + walkedEntities.add(entity.getGuid()); + } + + // walk through entities referenced by other entities + // referencedGuids will be updated within this for() loop; avoid use of iterators + List<String> referencedGuids = discoveryContext.getReferencedGuids(); + for (int i = 0; i < referencedGuids.size(); i++) { + String guid = referencedGuids.get(i); + + if (walkedEntities.contains(guid)) { + continue; + } + + AtlasEntity entity = entityStream.getByGuid(guid); + + if (entity != null) { + walkEntityGraph(entity); + + walkedEntities.add(entity.getGuid()); + } + } + } + + protected void resolveReferences() throws AtlasBaseException { + EntityResolver[] entityResolvers = new EntityResolver[] { new IDBasedEntityResolver(typeRegistry), + new UniqAttrBasedEntityResolver(typeRegistry) + }; + + for (EntityResolver resolver : entityResolvers) { + resolver.resolveEntityReferences(discoveryContext); + } + } + + private void visitReference(AtlasObjectIdType type, Object val) throws AtlasBaseException { + if (type == null || val == null) { + return; + } + + if (val instanceof AtlasObjectId) { + AtlasObjectId objId = (AtlasObjectId)val; + + if (!AtlasTypeUtil.isValid(objId)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, objId.toString()); + } + + recordObjectReference(objId); + } else if (val instanceof Map) { + AtlasObjectId objId = new AtlasObjectId((Map)val); + + if (!AtlasTypeUtil.isValid(objId)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, objId.toString()); + } + + recordObjectReference(objId); + } else { + throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString()); + } + } + + void visitAttribute(AtlasType attrType, Object val) throws AtlasBaseException { + if (attrType == null || val == null) { + return; + } + + switch (attrType.getTypeCategory()) { + case PRIMITIVE: + case ENUM: + return; + + case ARRAY: { + AtlasArrayType arrayType = (AtlasArrayType) attrType; + AtlasType elemType = arrayType.getElementType(); + + visitCollectionReferences(elemType, val); + } + break; + + case MAP: { + AtlasType keyType = ((AtlasMapType) attrType).getKeyType(); + AtlasType valueType = ((AtlasMapType) attrType).getValueType(); + + visitMapReferences(keyType, valueType, val); + } + break; + + case STRUCT: + visitStruct((AtlasStructType)attrType, val); + break; + + case OBJECT_ID_TYPE: + visitReference((AtlasObjectIdType) attrType, val); + break; + + default: + throw new AtlasBaseException(AtlasErrorCode.TYPE_CATEGORY_INVALID, attrType.getTypeCategory().name()); + } + } + + void visitMapReferences(AtlasType keyType, AtlasType valueType, Object val) throws AtlasBaseException { + if (keyType == null || valueType == null || val == null) { + return; + } + + if (isPrimitive(keyType.getTypeCategory()) && isPrimitive(valueType.getTypeCategory())) { + return; + } + + if (Map.class.isAssignableFrom(val.getClass())) { + Iterator<Map.Entry> it = ((Map) val).entrySet().iterator(); + while (it.hasNext()) { + Map.Entry e = it.next(); + visitAttribute(keyType, e.getKey()); + visitAttribute(valueType, e.getValue()); + } + } + } + + void visitCollectionReferences(AtlasType elemType, Object val) throws AtlasBaseException { + if (elemType == null || val == null || isPrimitive(elemType.getTypeCategory())) { + return; + } + + Iterator it = null; + + if (val instanceof Collection) { + it = ((Collection) val).iterator(); + } else if (val instanceof Iterable) { + it = ((Iterable) val).iterator(); + } else if (val instanceof Iterator) { + it = (Iterator) val; + } + + if (it != null) { + while (it.hasNext()) { + Object elem = it.next(); + visitAttribute(elemType, elem); + } + } + } + + void visitStruct(AtlasStructType structType, Object val) throws AtlasBaseException { + if (structType == null || val == null) { + return; + } + + final AtlasStruct struct; + + if (val instanceof AtlasStruct) { + struct = (AtlasStruct) val; + } else if (val instanceof Map) { + Map attributes = AtlasTypeUtil.toStructAttributes((Map) val); + + struct = new AtlasStruct(structType.getTypeName(), attributes); + } else { + throw new AtlasBaseException(AtlasErrorCode.INVALID_STRUCT_VALUE, val.toString()); + } + + visitStruct(structType, struct); + } + + void visitEntity(AtlasEntityType entityType, AtlasEntity entity) throws AtlasBaseException { + List<String> visitedAttributes = new ArrayList<>(); + + // visit relationship attributes + visitRelationships(entityType, entity, visitedAttributes); + + // visit struct attributes + for (AtlasAttribute attribute : entityType.getAllAttributes().values()) { + AtlasType attrType = attribute.getAttributeType(); + String attrName = attribute.getName(); + Object attrVal = entity.getAttribute(attrName); + + if (entity.hasAttribute(attrName) && !visitedAttributes.contains(attrName)) { + visitAttribute(attrType, attrVal); + } + } + } + + private void visitRelationships(AtlasEntityType entityType, AtlasEntity entity, List<String> visitedAttributes) throws AtlasBaseException { + for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) { + AtlasType attrType = attribute.getAttributeType(); + String attrName = attribute.getName(); + Object attrVal = entity.getRelationshipAttribute(attrName); + + if (entity.hasRelationshipAttribute(attrName)) { + visitAttribute(attrType, attrVal); + + visitedAttributes.add(attrName); + } + } + } + + void visitStruct(AtlasStructType structType, AtlasStruct struct) throws AtlasBaseException { + for (AtlasAttribute attribute : structType.getAllAttributes().values()) { + AtlasType attrType = attribute.getAttributeType(); + Object attrVal = struct.getAttribute(attribute.getName()); + + visitAttribute(attrType, attrVal); + } + } + + void walkEntityGraph(AtlasEntity entity) throws AtlasBaseException { + if (entity == null) { + return; + } + + AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName()); + + if (type == null) { + throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName()); + } + + recordObjectReference(entity.getGuid()); + + visitEntity(type, entity); + } + + + boolean isPrimitive(TypeCategory typeCategory) { + return typeCategory == TypeCategory.PRIMITIVE || typeCategory == TypeCategory.ENUM; + } + + private void recordObjectReference(String guid) { + discoveryContext.addReferencedGuid(guid); + } + + private void recordObjectReference(AtlasObjectId objId) { + if (AtlasTypeUtil.isValidGuid(objId)) { + discoveryContext.addReferencedGuid(objId.getGuid()); + } else { + discoveryContext.addReferencedByUniqAttribs(objId); + } + } +}