Repository: atlas Updated Branches: refs/heads/master bcd5bb607 -> 1deb8c16e
ATLAS-2715: Create audit events for term-entity association and disassociation Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/1deb8c16 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/1deb8c16 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/1deb8c16 Branch: refs/heads/master Commit: 1deb8c16ec8eca68962eff7f8e4d6dac81fd37aa Parents: bcd5bb6 Author: Sarath Subramanian <ssubraman...@hortonworks.com> Authored: Wed May 23 15:02:36 2018 -0700 Committer: Sarath Subramanian <ssubraman...@hortonworks.com> Committed: Wed May 23 21:38:03 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/atlas/EntityAuditEvent.java | 7 +- dashboardv2/public/js/utils/Enums.js | 4 +- .../atlas/listener/EntityChangeListenerV2.java | 18 ++++++ .../atlas/model/audit/EntityAuditEventV2.java | 7 +- .../atlas/model/glossary/AtlasGlossaryTerm.java | 11 ++++ .../apache/atlas/glossary/GlossaryService.java | 41 +++++++----- .../repository/audit/EntityAuditListener.java | 38 +++++++++++ .../repository/audit/EntityAuditListenerV2.java | 67 ++++++++++++++++++-- .../audit/HBaseBasedAuditRepository.java | 28 +++++--- .../graph/v1/AtlasEntityChangeNotifier.java | 54 ++++++++++++++++ .../atlas/listener/EntityChangeListener.java | 19 ++++++ .../NoOpNotificationChangeListener.java | 11 ++++ .../EntityNotificationListenerV2.java | 12 ++++ .../NotificationEntityChangeListener.java | 11 ++++ 14 files changed, 294 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/1deb8c16/client/client-v1/src/main/java/org/apache/atlas/EntityAuditEvent.java ---------------------------------------------------------------------- diff --git a/client/client-v1/src/main/java/org/apache/atlas/EntityAuditEvent.java b/client/client-v1/src/main/java/org/apache/atlas/EntityAuditEvent.java index 7077e15..fcd6a62 100644 --- a/client/client-v1/src/main/java/org/apache/atlas/EntityAuditEvent.java +++ b/client/client-v1/src/main/java/org/apache/atlas/EntityAuditEvent.java @@ -48,7 +48,8 @@ public class EntityAuditEvent implements Serializable { public enum EntityAuditAction { ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, TAG_ADD, TAG_DELETE, TAG_UPDATE, PROPAGATED_TAG_ADD, PROPAGATED_TAG_DELETE, PROPAGATED_TAG_UPDATE, - ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE; + ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE, + TERM_ADD, TERM_DELETE; public static EntityAuditAction fromString(String strValue) { switch (strValue) { @@ -79,6 +80,10 @@ public class EntityAuditEvent implements Serializable { return PROPAGATED_TAG_DELETE; case "PROPAGATED_TAG_UPDATE": return PROPAGATED_TAG_UPDATE; + case "TERM_ADD": + return TERM_ADD; + case "TERM_DELETE": + return TERM_DELETE; } throw new IllegalArgumentException("No enum constant " + EntityAuditAction.class.getCanonicalName() + "." + strValue); http://git-wip-us.apache.org/repos/asf/atlas/blob/1deb8c16/dashboardv2/public/js/utils/Enums.js ---------------------------------------------------------------------- diff --git a/dashboardv2/public/js/utils/Enums.js b/dashboardv2/public/js/utils/Enums.js index 1e493e5..81de527 100644 --- a/dashboardv2/public/js/utils/Enums.js +++ b/dashboardv2/public/js/utils/Enums.js @@ -33,7 +33,9 @@ define(['require'], function(require) { PROPAGATED_CLASSIFICATION_UPDATE: "Propagated Classification Updated", ENTITY_IMPORT_CREATE: "Entity Created by import", ENTITY_IMPORT_UPDATE: "Entity Updated by import", - ENTITY_IMPORT_DELETE: "Entity Deleted by import" + ENTITY_IMPORT_DELETE: "Entity Deleted by import", + TERM_ADD: "Term Added", + TERM_DELETE: "Term Deleted" } Enums.entityStateReadOnly = { http://git-wip-us.apache.org/repos/asf/atlas/blob/1deb8c16/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java index 9c735a0..cccf0d4 100644 --- a/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java +++ b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java @@ -19,8 +19,10 @@ package org.apache.atlas.listener; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.glossary.AtlasGlossaryTerm; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasRelatedObjectId; import java.util.List; @@ -78,4 +80,20 @@ public interface EntityChangeListenerV2 { * @throws AtlasBaseException if the listener notification fails */ void onClassificationsDeleted(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException; + + /** + * This is upon adding a new term to an entity. + * + * @param term the term + * @param entities list of entities to which the term is assigned + */ + void onTermAdded(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) throws AtlasBaseException; + + /** + * This is upon removing a term from an entity. + * + * @param term the term + * @param entities list of entities to which the term is assigned + */ + void onTermDeleted(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) throws AtlasBaseException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/1deb8c16/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java index 787f5a9..649f11f 100644 --- a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java +++ b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java @@ -49,7 +49,8 @@ public class EntityAuditEventV2 implements Serializable { ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE, CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE, - PROPAGATED_CLASSIFICATION_ADD, PROPAGATED_CLASSIFICATION_DELETE, PROPAGATED_CLASSIFICATION_UPDATE; + PROPAGATED_CLASSIFICATION_ADD, PROPAGATED_CLASSIFICATION_DELETE, PROPAGATED_CLASSIFICATION_UPDATE, + TERM_ADD, TERM_DELETE; public static EntityAuditActionV2 fromString(String strValue) { switch (strValue) { @@ -80,6 +81,10 @@ public class EntityAuditEventV2 implements Serializable { return PROPAGATED_CLASSIFICATION_DELETE; case "PROPAGATED_CLASSIFICATION_UPDATE": return PROPAGATED_CLASSIFICATION_UPDATE; + case "TERM_ADD": + return TERM_ADD; + case "TERM_DELETE": + return TERM_DELETE; } throw new IllegalArgumentException("No enum constant " + EntityAuditActionV2.class.getCanonicalName() + "." + strValue); http://git-wip-us.apache.org/repos/asf/atlas/blob/1deb8c16/intg/src/main/java/org/apache/atlas/model/glossary/AtlasGlossaryTerm.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/glossary/AtlasGlossaryTerm.java b/intg/src/main/java/org/apache/atlas/model/glossary/AtlasGlossaryTerm.java index daf6c7b..578d4de 100644 --- a/intg/src/main/java/org/apache/atlas/model/glossary/AtlasGlossaryTerm.java +++ b/intg/src/main/java/org/apache/atlas/model/glossary/AtlasGlossaryTerm.java @@ -24,6 +24,7 @@ import org.apache.atlas.model.glossary.relations.AtlasGlossaryHeader; import org.apache.atlas.model.glossary.relations.AtlasRelatedTermHeader; import org.apache.atlas.model.glossary.relations.AtlasTermCategorizationHeader; import org.apache.atlas.model.instance.AtlasRelatedObjectId; +import org.apache.atlas.type.AtlasType; import org.apache.commons.collections.CollectionUtils; import java.util.HashMap; @@ -286,6 +287,16 @@ public class AtlasGlossaryTerm extends AtlasGlossaryBaseObject { } @JsonIgnore + public String toAuditString() { + AtlasGlossaryTerm t = new AtlasGlossaryTerm(); + t.setGuid(this.getGuid()); + t.setDisplayName(this.getDisplayName()); + t.setQualifiedName(this.getQualifiedName()); + + return AtlasType.toJson(t); + } + + @JsonIgnore public boolean hasTerms() { return hasTerms; } http://git-wip-us.apache.org/repos/asf/atlas/blob/1deb8c16/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java b/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java index a5499e0..3467259 100644 --- a/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java +++ b/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java @@ -31,6 +31,7 @@ import org.apache.atlas.model.instance.AtlasRelatedObjectId; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.ogm.DataAccess; import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; +import org.apache.atlas.repository.store.graph.v1.AtlasEntityChangeNotifier; import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.collections.CollectionUtils; @@ -56,24 +57,26 @@ import static org.apache.atlas.glossary.GlossaryUtils.getGlossarySkeleton; @Service public class GlossaryService { - private static final Logger LOG = LoggerFactory.getLogger(GlossaryService.class); - private static final boolean DEBUG_ENABLED = LOG.isDebugEnabled(); + private static final Logger LOG = LoggerFactory.getLogger(GlossaryService.class); + private static final boolean DEBUG_ENABLED = LOG.isDebugEnabled(); + private static final String QUALIFIED_NAME_ATTR = "qualifiedName"; - private static final String QUALIFIED_NAME_ATTR = "qualifiedName"; - - private final DataAccess dataAccess; - private final GlossaryTermUtils glossaryTermUtils; - private final GlossaryCategoryUtils glossaryCategoryUtils; - private final AtlasTypeRegistry atlasTypeRegistry; + private final DataAccess dataAccess; + private final GlossaryTermUtils glossaryTermUtils; + private final GlossaryCategoryUtils glossaryCategoryUtils; + private final AtlasTypeRegistry atlasTypeRegistry; + private final AtlasEntityChangeNotifier entityChangeNotifier; private final char[] invalidNameChars = {'@', '.'}; @Inject - public GlossaryService(DataAccess dataAccess, final AtlasRelationshipStore relationshipStore, final AtlasTypeRegistry typeRegistry) { - this.dataAccess = dataAccess; - this.atlasTypeRegistry = typeRegistry; - glossaryTermUtils = new GlossaryTermUtils(relationshipStore, typeRegistry, dataAccess); - glossaryCategoryUtils = new GlossaryCategoryUtils(relationshipStore, typeRegistry, dataAccess); + public GlossaryService(DataAccess dataAccess, final AtlasRelationshipStore relationshipStore, + final AtlasTypeRegistry typeRegistry, AtlasEntityChangeNotifier entityChangeNotifier) { + this.dataAccess = dataAccess; + atlasTypeRegistry = typeRegistry; + glossaryTermUtils = new GlossaryTermUtils(relationshipStore, typeRegistry, dataAccess); + glossaryCategoryUtils = new GlossaryCategoryUtils(relationshipStore, typeRegistry, dataAccess); + this.entityChangeNotifier = entityChangeNotifier; } /** @@ -477,24 +480,32 @@ public class GlossaryService { if (DEBUG_ENABLED) { LOG.debug("==> GlossaryService.assignTermToEntities({}, {})", termGuid, relatedObjectIds); } + AtlasGlossaryTerm glossaryTerm = dataAccess.load(getAtlasGlossaryTermSkeleton(termGuid)); + glossaryTermUtils.processTermAssignments(glossaryTerm, relatedObjectIds); + entityChangeNotifier.onTermAddedToEntities(glossaryTerm, relatedObjectIds); + if (DEBUG_ENABLED) { LOG.debug("<== GlossaryService.assignTermToEntities()"); } + } @GraphTransaction public void removeTermFromEntities(String termGuid, List<AtlasRelatedObjectId> relatedObjectIds) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { + if (DEBUG_ENABLED) { LOG.debug("==> GlossaryService.removeTermFromEntities({}, {})", termGuid, relatedObjectIds); } AtlasGlossaryTerm glossaryTerm = dataAccess.load(getAtlasGlossaryTermSkeleton(termGuid)); + glossaryTermUtils.processTermDissociation(glossaryTerm, relatedObjectIds); - if (LOG.isDebugEnabled()) { + entityChangeNotifier.onTermDeletedFromEntities(glossaryTerm, relatedObjectIds); + + if (DEBUG_ENABLED) { LOG.debug("<== GlossaryService.removeTermFromEntities()"); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/1deb8c16/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java index a085e8e..7369c5e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java @@ -23,6 +23,7 @@ import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.EntityAuditEvent.EntityAuditAction; import org.apache.atlas.RequestContextV1; import org.apache.atlas.listener.EntityChangeListener; +import org.apache.atlas.model.glossary.AtlasGlossaryTerm; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Struct; import org.apache.atlas.type.AtlasEntityType; @@ -43,6 +44,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TERM_ADD; +import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TERM_DELETE; + /** * Listener on entity create/update/delete, tag add/delete. Adds the corresponding audit event to the audit repository. */ @@ -127,6 +131,28 @@ public class EntityAuditListener implements EntityChangeListener { auditRepository.putEventsV1(events); } + @Override + public void onTermAdded(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException { + List<EntityAuditEvent> events = new ArrayList<>(); + + for (Referenceable entity : entities) { + events.add(createEvent(entity, TERM_ADD, "Added term: " + term.toAuditString())); + } + + auditRepository.putEventsV1(events); + } + + @Override + public void onTermDeleted(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException { + List<EntityAuditEvent> events = new ArrayList<>(); + + for (Referenceable entity : entities) { + events.add(createEvent(entity, TERM_DELETE, "Deleted term: " + term.toAuditString())); + } + + auditRepository.putEventsV1(events); + } + public List<EntityAuditEvent> getAuditEvents(String guid) throws AtlasException{ return auditRepository.listEventsV1(guid, null, (short) 10); } @@ -290,6 +316,12 @@ public class EntityAuditListener implements EntityChangeListener { case ENTITY_IMPORT_DELETE: ret = "Deleted by import: "; break; + case TERM_ADD: + ret = "Added term: "; + break; + case TERM_DELETE: + ret = "Deleted term: "; + break; default: ret = "Unknown: "; } @@ -328,6 +360,12 @@ public class EntityAuditListener implements EntityChangeListener { case ENTITY_IMPORT_DELETE: ret = "Deleted by import: "; break; + case TERM_ADD: + ret = "Added term: "; + break; + case TERM_DELETE: + ret = "Deleted term: "; + break; default: ret = "Unknown: "; } http://git-wip-us.apache.org/repos/asf/atlas/blob/1deb8c16/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java index 970b14a..a988e47 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java @@ -17,15 +17,18 @@ */ package org.apache.atlas.repository.audit; -import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.EntityAuditEvent.EntityAuditAction; import org.apache.atlas.RequestContextV1; import org.apache.atlas.model.audit.EntityAuditEventV2; import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.listener.EntityChangeListenerV2; +import org.apache.atlas.model.glossary.AtlasGlossaryTerm; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.instance.AtlasRelatedObjectId; +import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.type.AtlasType; @@ -56,18 +59,22 @@ import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_UPDATE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.TERM_ADD; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.TERM_DELETE; @Component public class EntityAuditListenerV2 implements EntityChangeListenerV2 { private static final Logger LOG = LoggerFactory.getLogger(EntityAuditListenerV2.class); - private final EntityAuditRepository auditRepository; - private final AtlasTypeRegistry typeRegistry; + private final EntityAuditRepository auditRepository; + private final AtlasTypeRegistry typeRegistry; + private final AtlasInstanceConverter instanceConverter; @Inject - public EntityAuditListenerV2(EntityAuditRepository auditRepository, AtlasTypeRegistry typeRegistry) { - this.auditRepository = auditRepository; - this.typeRegistry = typeRegistry; + public EntityAuditListenerV2(EntityAuditRepository auditRepository, AtlasTypeRegistry typeRegistry, AtlasInstanceConverter instanceConverter) { + this.auditRepository = auditRepository; + this.typeRegistry = typeRegistry; + this.instanceConverter = instanceConverter; } @Override @@ -167,6 +174,42 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 { } } + @Override + public void onTermAdded(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) throws AtlasBaseException { + if (term != null && CollectionUtils.isNotEmpty(entities)) { + List<EntityAuditEventV2> events = new ArrayList<>(); + + for (AtlasRelatedObjectId relatedObjectId : entities) { + AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(relatedObjectId.getGuid()); + AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null; + + if (entity != null) { + events.add(createEvent(entity, TERM_ADD, "Added term: " + term.toAuditString())); + } + } + + auditRepository.putEventsV2(events); + } + } + + @Override + public void onTermDeleted(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) throws AtlasBaseException { + if (term != null && CollectionUtils.isNotEmpty(entities)) { + List<EntityAuditEventV2> events = new ArrayList<>(); + + for (AtlasRelatedObjectId relatedObjectId : entities) { + AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(relatedObjectId.getGuid()); + AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null; + + if (entity != null) { + events.add(createEvent(entity, TERM_DELETE, "Deleted term: " + term.toAuditString())); + } + } + + auditRepository.putEventsV2(events); + } + } + private EntityAuditEventV2 createEvent(AtlasEntity entity, EntityAuditActionV2 action, String details) { return new EntityAuditEventV2(entity.getGuid(), RequestContextV1.get().getRequestTime(), RequestContextV1.get().getUser(), action, details, entity); @@ -310,6 +353,12 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 { case ENTITY_IMPORT_DELETE: ret = "Deleted by import: "; break; + case TERM_ADD: + ret = "Added term: "; + break; + case TERM_DELETE: + ret = "Deleted term: "; + break; default: ret = "Unknown: "; } @@ -348,6 +397,12 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 { case ENTITY_IMPORT_DELETE: ret = "Deleted by import: "; break; + case TERM_ADD: + ret = "Added term: "; + break; + case TERM_DELETE: + ret = "Deleted term: "; + break; default: ret = "Unknown: "; } http://git-wip-us.apache.org/repos/asf/atlas/blob/1deb8c16/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java index 3656b85..65686ea 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java @@ -70,6 +70,8 @@ import java.util.List; import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_ADD; import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_DELETE; import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_UPDATE; +import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TERM_ADD; +import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TERM_DELETE; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditType; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditType.ENTITY_AUDIT_V1; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditType.ENTITY_AUDIT_V2; @@ -319,19 +321,25 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito if (StringUtils.isNotEmpty(v1DetailsWithPrefix)) { EntityAuditAction v1AuditAction = EntityAuditAction.fromString(getResultString(result, COLUMN_ACTION)); - String v1AuditPrefix = EntityAuditListener.getV1AuditPrefix(v1AuditAction); - String[] split = v1DetailsWithPrefix.split(v1AuditPrefix); - if (ArrayUtils.isNotEmpty(split) && split.length == 2) { - String v1AuditDetails = split[1]; - Referenceable referenceable = AtlasType.fromV1Json(v1AuditDetails, Referenceable.class); - String v2Json = (referenceable != null) ? toV2Json(referenceable, v1AuditAction) : v1AuditDetails; + if (v1AuditAction == TERM_ADD || v1AuditAction == TERM_DELETE) { + // for terms audit v1 and v2 structure is same + ret = v1DetailsWithPrefix; + } else { + String v1AuditPrefix = EntityAuditListener.getV1AuditPrefix(v1AuditAction); + String[] split = v1DetailsWithPrefix.split(v1AuditPrefix); + + if (ArrayUtils.isNotEmpty(split) && split.length == 2) { + String v1AuditDetails = split[1]; + Referenceable referenceable = AtlasType.fromV1Json(v1AuditDetails, Referenceable.class); + String v2Json = (referenceable != null) ? toV2Json(referenceable, v1AuditAction) : v1AuditDetails; - if (v2Json != null) { - ret = getV2AuditPrefix(v1AuditAction) + v2Json; + if (v2Json != null) { + ret = getV2AuditPrefix(v1AuditAction) + v2Json; + } + } else { + ret = v1DetailsWithPrefix; } - } else { - ret = v1DetailsWithPrefix; } } http://git-wip-us.apache.org/repos/asf/atlas/blob/1deb8c16/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java index 083600e..0e90336 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java @@ -25,11 +25,13 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.listener.EntityChangeListenerV2; import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2; +import org.apache.atlas.model.glossary.AtlasGlossaryTerm; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.AtlasRelatedObjectId; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutations.EntityOperation; import org.apache.atlas.type.AtlasEntityType; @@ -192,6 +194,44 @@ public class AtlasEntityChangeNotifier { } } + public void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException { + // listeners notified on term-entity association only if v2 notifications are enabled + if (isV2EntityNotificationEnabled()) { + for (EntityChangeListenerV2 listener : entityChangeListenersV2) { + listener.onTermAdded(term, entityIds); + } + } else { + List<Referenceable> entityRefs = toReferenceables(entityIds); + + for (EntityChangeListener listener : entityChangeListeners) { + try { + listener.onTermAdded(entityRefs, term); + } catch (AtlasException e) { + throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TermAdd"); + } + } + } + } + + public void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException { + // listeners notified on term-entity disassociation only if v2 notifications are enabled + if (isV2EntityNotificationEnabled()) { + for (EntityChangeListenerV2 listener : entityChangeListenersV2) { + listener.onTermDeleted(term, entityIds); + } + } else { + List<Referenceable> entityRefs = toReferenceables(entityIds); + + for (EntityChangeListener listener : entityChangeListeners) { + try { + listener.onTermDeleted(entityRefs, term); + } catch (AtlasException e) { + throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TermDelete"); + } + } + } + } + public void notifyPropagatedEntities() throws AtlasBaseException { RequestContextV1 context = RequestContextV1.get(); Map<String, List<AtlasClassification>> addedPropagations = context.getAddedPropagations(); @@ -297,6 +337,20 @@ public class AtlasEntityChangeNotifier { return ret; } + private List<Referenceable> toReferenceables(List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException { + List<Referenceable> ret = new ArrayList<>(); + + if (CollectionUtils.isNotEmpty(entityIds)) { + for (AtlasRelatedObjectId relatedObjectId : entityIds) { + String entityGuid = relatedObjectId.getGuid(); + + ret.add(toReferenceable(entityGuid)); + } + } + + return ret; + } + private Referenceable toReferenceable(String entityId) throws AtlasBaseException { Referenceable ret = null; http://git-wip-us.apache.org/repos/asf/atlas/blob/1deb8c16/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java ---------------------------------------------------------------------- diff --git a/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java b/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java index ab379e0..e24f582 100644 --- a/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java +++ b/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java @@ -19,6 +19,7 @@ package org.apache.atlas.listener; import org.apache.atlas.AtlasException; +import org.apache.atlas.model.glossary.AtlasGlossaryTerm; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Struct; @@ -86,4 +87,22 @@ public interface EntityChangeListener { * @throws AtlasException */ void onEntitiesDeleted(Collection<Referenceable> entities, boolean isImport) throws AtlasException; + + /** + * This is upon adding a new term to a list of typed instance. + * + * @param entities entity list + * @param term term that needs to be added to entity + * @throws AtlasException if the listener notification fails + */ + void onTermAdded(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException; + + /** + * This is upon adding a new trait to a typed instance. + * + * @param entities entity list + * @param term term that needs to be added to entity + * @throws AtlasException if the listener notification fails + */ + void onTermDeleted(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException; } http://git-wip-us.apache.org/repos/asf/atlas/blob/1deb8c16/tools/atlas-migration-exporter/src/main/java/org/apache/atlas/migration/NoOpNotificationChangeListener.java ---------------------------------------------------------------------- diff --git a/tools/atlas-migration-exporter/src/main/java/org/apache/atlas/migration/NoOpNotificationChangeListener.java b/tools/atlas-migration-exporter/src/main/java/org/apache/atlas/migration/NoOpNotificationChangeListener.java index 9af7819..782ac4b 100644 --- a/tools/atlas-migration-exporter/src/main/java/org/apache/atlas/migration/NoOpNotificationChangeListener.java +++ b/tools/atlas-migration-exporter/src/main/java/org/apache/atlas/migration/NoOpNotificationChangeListener.java @@ -20,6 +20,7 @@ package org.apache.atlas.migration; import org.apache.atlas.AtlasException; import org.apache.atlas.listener.EntityChangeListener; +import org.apache.atlas.model.glossary.AtlasGlossaryTerm; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Struct; import org.springframework.stereotype.Component; @@ -57,4 +58,14 @@ public class NoOpNotificationChangeListener implements EntityChangeListener { public void onEntitiesDeleted(Collection<Referenceable> entities, boolean isImport) throws AtlasException { } + + @Override + public void onTermAdded(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException { + + } + + @Override + public void onTermDeleted(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException { + + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/1deb8c16/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java index 9de49d4..017de99 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java +++ b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java @@ -21,9 +21,11 @@ import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.RequestContextV1; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.listener.EntityChangeListenerV2; +import org.apache.atlas.model.glossary.AtlasGlossaryTerm; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.AtlasRelatedObjectId; import org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2; import org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType; import org.apache.atlas.type.AtlasClassificationType; @@ -102,6 +104,16 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 { notifyEntityEvents(Collections.singletonList(entity), CLASSIFICATION_DELETE); } + @Override + public void onTermAdded(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) { + // do nothing -> notification not sent out for term assignment to entities + } + + @Override + public void onTermDeleted(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) { + // do nothing -> notification not sent out for term removal from entities + } + private void notifyEntityEvents(List<AtlasEntity> entities, OperationType operationType) throws AtlasBaseException { List<EntityNotificationV2> messages = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/atlas/blob/1deb8c16/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java index dbdf67d..1eeecef 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.listener.EntityChangeListener; +import org.apache.atlas.model.glossary.AtlasGlossaryTerm; import org.apache.atlas.notification.NotificationInterface.NotificationType; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Struct; @@ -99,6 +100,16 @@ public class NotificationEntityChangeListener implements EntityChangeListener { notifyOfEntityEvent(entities, OperationType.ENTITY_DELETE); } + @Override + public void onTermAdded(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException { + // do nothing + } + + @Override + public void onTermDeleted(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException { + // do nothing + } + // ----- helper methods -------------------------------------------------