Repository: atlas Updated Branches: refs/heads/master 30dd2f5b8 -> 2237a895b
ATLAS-2609: Update audit and notification listener to handle propagated classification add/delete/update Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/2237a895 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/2237a895 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/2237a895 Branch: refs/heads/master Commit: 2237a895bb259a10440829bd348460b6eb5b7b59 Parents: 30dd2f5 Author: Sarath Subramanian <[email protected]> Authored: Wed Apr 25 13:27:09 2018 -0700 Committer: Sarath Subramanian <[email protected]> Committed: Wed Apr 25 21:39:19 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/atlas/EntityAuditEvent.java | 7 + dashboardv2/public/js/utils/Enums.js | 3 + .../atlas/listener/EntityChangeListenerV2.java | 4 +- .../atlas/model/audit/EntityAuditEventV2.java | 9 +- .../AbstractStorageBasedAuditRepository.java | 8 +- .../repository/audit/EntityAuditListener.java | 8 +- .../repository/audit/EntityAuditListenerV2.java | 66 ++++++- .../audit/HBaseBasedAuditRepository.java | 23 ++- .../converters/AtlasInstanceConverter.java | 12 ++ .../graph/v1/AtlasEntityChangeNotifier.java | 47 ++++- .../graph/v1/AtlasRelationshipStoreV1.java | 39 +++- .../store/graph/v1/DeleteHandlerV1.java | 190 +++++++++++++++++-- .../store/graph/v1/EntityGraphMapper.java | 83 ++++---- .../store/graph/v1/EntityGraphRetriever.java | 150 --------------- .../graph/v1/AtlasRelationshipStoreV1Test.java | 5 +- .../java/org/apache/atlas/RequestContextV1.java | 50 ++++- .../atlas/listener/EntityChangeListener.java | 4 +- .../NoOpNotificationChangeListener.java | 2 +- .../EntityNotificationListenerV2.java | 2 +- .../NotificationEntityChangeListener.java | 2 +- 20 files changed, 461 insertions(+), 253 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/2237a895/client/client-v1/src/main/java/org/apache/atlas/EntityAuditEvent.java ---------------------------------------------------------------------- diff --git a/client/client-v1/src/main/java/org/apache/atlas/EntityAuditEvent.java b/client/client-v1/src/main/java/org/apache/atlas/EntityAuditEvent.java index 3cf3e21..7077e15 100644 --- a/client/client-v1/src/main/java/org/apache/atlas/EntityAuditEvent.java +++ b/client/client-v1/src/main/java/org/apache/atlas/EntityAuditEvent.java @@ -47,6 +47,7 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ public class EntityAuditEvent implements Serializable { public enum EntityAuditAction { ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, TAG_ADD, TAG_DELETE, TAG_UPDATE, + PROPAGATED_TAG_ADD, PROPAGATED_TAG_DELETE, PROPAGATED_TAG_UPDATE, ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE; public static EntityAuditAction fromString(String strValue) { @@ -72,6 +73,12 @@ public class EntityAuditEvent implements Serializable { case "CLASSIFICATION_UPDATE": case "TAG_UPDATE": return TAG_UPDATE; + case "PROPAGATED_TAG_ADD": + return PROPAGATED_TAG_ADD; + case "PROPAGATED_TAG_DELETE": + return PROPAGATED_TAG_DELETE; + case "PROPAGATED_TAG_UPDATE": + return PROPAGATED_TAG_UPDATE; } throw new IllegalArgumentException("No enum constant " + EntityAuditAction.class.getCanonicalName() + "." + strValue); http://git-wip-us.apache.org/repos/asf/atlas/blob/2237a895/dashboardv2/public/js/utils/Enums.js ---------------------------------------------------------------------- diff --git a/dashboardv2/public/js/utils/Enums.js b/dashboardv2/public/js/utils/Enums.js index 8c0518b..ca8608f 100644 --- a/dashboardv2/public/js/utils/Enums.js +++ b/dashboardv2/public/js/utils/Enums.js @@ -28,6 +28,9 @@ define(['require'], function(require) { TAG_ADD: "Classification Added", TAG_DELETE: "Classification Deleted", TAG_UPDATE: "Classification Updated", + PROPAGATED_TAG_ADD: "Propagated Classification Added", + PROPAGATED_TAG_DELETE: "Propagated Classification Deleted", + PROPAGATED_TAG_UPDATE: "Propagated Classification Updated", ENTITY_IMPORT_CREATE: "Entity Created by import", ENTITY_IMPORT_UPDATE: "Entity Updated by import", ENTITY_IMPORT_DELETE: "Entity Deleted by import" http://git-wip-us.apache.org/repos/asf/atlas/blob/2237a895/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java index 70877d2..9c735a0 100644 --- a/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java +++ b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java @@ -74,8 +74,8 @@ public interface EntityChangeListenerV2 { * This is upon deleting classifications from an entity. * * @param entity the entity - * @param classificationNames classifications names for the instance that needs to be deleted from entity + * @param classifications classifications that needs to be updated for an entity * @throws AtlasBaseException if the listener notification fails */ - void onClassificationsDeleted(AtlasEntity entity, List<String> classificationNames) throws AtlasBaseException; + void onClassificationsDeleted(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/2237a895/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java index 1045c77..d14f6ae 100644 --- a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java +++ b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java @@ -45,7 +45,8 @@ public class EntityAuditEventV2 implements Serializable { public enum EntityAuditAction { ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE, - CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE; + CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE, + PROPAGATED_CLASSIFICATION_ADD, PROPAGATED_CLASSIFICATION_DELETE, PROPAGATED_CLASSIFICATION_UPDATE; public static EntityAuditAction fromString(String strValue) { switch (strValue) { @@ -70,6 +71,12 @@ public class EntityAuditEventV2 implements Serializable { case "CLASSIFICATION_UPDATE": case "TAG_UPDATE": return CLASSIFICATION_UPDATE; + case "PROPAGATED_CLASSIFICATION_ADD": + return PROPAGATED_CLASSIFICATION_ADD; + case "PROPAGATED_CLASSIFICATION_DELETE": + return PROPAGATED_CLASSIFICATION_DELETE; + case "PROPAGATED_CLASSIFICATION_UPDATE": + return PROPAGATED_CLASSIFICATION_UPDATE; } throw new IllegalArgumentException("No enum constant " + EntityAuditAction.class.getCanonicalName() + "." + strValue); http://git-wip-us.apache.org/repos/asf/atlas/blob/2237a895/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java index af6c4e6..39b1ef2 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java @@ -156,10 +156,10 @@ public abstract class AbstractStorageBasedAuditRepository implements Service, En APPLICATION_PROPERTIES = config; } - protected byte[] getKey(String id, Long ts) { - assert id != null : "entity id can't be null"; - assert ts != null : "timestamp can't be null"; - String keyStr = id + FIELD_SEPARATOR + ts; + protected byte[] getKey(String id, Long ts, int index) { + assert id != null : "entity id can't be null"; + assert ts != null : "timestamp can't be null"; + String keyStr = id + FIELD_SEPARATOR + ts + FIELD_SEPARATOR + index; return Bytes.toBytes(keyStr); } http://git-wip-us.apache.org/repos/asf/atlas/blob/2237a895/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java index 1c04eea..6e868e6 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java @@ -94,10 +94,10 @@ public class EntityAuditListener implements EntityChangeListener { } @Override - public void onTraitsDeleted(Referenceable entity, Collection<String> traitNames) throws AtlasException { - if (traitNames != null) { - for (String traitName : traitNames) { - EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName); + public void onTraitsDeleted(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException { + if (traits != null) { + for (Struct trait : traits) { + EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_DELETE, "Deleted trait: " + trait.getTypeName()); auditRepository.putEventsV1(event); } http://git-wip-us.apache.org/repos/asf/atlas/blob/2237a895/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java index bb51014..4fd2fd9 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java @@ -30,6 +30,7 @@ import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -50,6 +51,9 @@ import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction. import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_IMPORT_DELETE; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_IMPORT_UPDATE; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_UPDATE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_ADD; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_DELETE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_UPDATE; @Component public class EntityAuditListenerV2 implements EntityChangeListenerV2 { @@ -109,7 +113,11 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 { List<EntityAuditEventV2> events = new ArrayList<>(); for (AtlasClassification classification : classifications) { - events.add(createEvent(entity, CLASSIFICATION_ADD, "Added classification: " + AtlasType.toJson(classification))); + if (entity.getGuid().equals(classification.getEntityGuid())) { + events.add(createEvent(entity, CLASSIFICATION_ADD, "Added classification: " + AtlasType.toJson(classification))); + } else { + events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + AtlasType.toJson(classification))); + } } auditRepository.putEventsV2(events); @@ -120,9 +128,20 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 { public void onClassificationsUpdated(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException { if (CollectionUtils.isNotEmpty(classifications)) { List<EntityAuditEventV2> events = new ArrayList<>(); + String guid = entity.getGuid(); for (AtlasClassification classification : classifications) { - events.add(createEvent(entity, CLASSIFICATION_UPDATE, "Updated classification: " + AtlasType.toJson(classification))); + if (guid.equals(classification.getEntityGuid())) { + events.add(createEvent(entity, CLASSIFICATION_UPDATE, "Updated classification: " + AtlasType.toJson(classification))); + } else { + if (isPropagatedClassificationAdded(guid, classification)) { + events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + AtlasType.toJson(classification))); + } else if (isPropagatedClassificationDeleted(guid, classification)) { + events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + classification.getTypeName())); + } else { + events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_UPDATE, "Updated propagated classification: " + AtlasType.toJson(classification))); + } + } } auditRepository.putEventsV2(events); @@ -130,12 +149,16 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 { } @Override - public void onClassificationsDeleted(AtlasEntity entity, List<String> classificationNames) throws AtlasBaseException { - if (CollectionUtils.isNotEmpty(classificationNames)) { + public void onClassificationsDeleted(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException { + if (CollectionUtils.isNotEmpty(classifications)) { List<EntityAuditEventV2> events = new ArrayList<>(); - for (String classificationName : classificationNames) { - events.add(createEvent(entity, CLASSIFICATION_DELETE, "Deleted classification: " + classificationName)); + for (AtlasClassification classification : classifications) { + if (StringUtils.equals(entity.getGuid(), classification.getEntityGuid())) { + events.add(createEvent(entity, CLASSIFICATION_DELETE, "Deleted classification: " + classification.getTypeName())); + } else { + events.add(createEvent(entity, PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + classification.getTypeName())); + } } auditRepository.putEventsV2(events); @@ -180,6 +203,37 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 { return auditString; } + private boolean isPropagatedClassificationAdded(String guid, AtlasClassification classification) { + Map<String, List<AtlasClassification>> addedPropagations = RequestContextV1.get().getAddedPropagations(); + + return hasPropagatedEntry(addedPropagations, guid, classification); + } + + private boolean isPropagatedClassificationDeleted(String guid, AtlasClassification classification) { + Map<String, List<AtlasClassification>> removedPropagations = RequestContextV1.get().getRemovedPropagations(); + + return hasPropagatedEntry(removedPropagations, guid, classification); + } + + private boolean hasPropagatedEntry(Map<String, List<AtlasClassification>> propagationsMap, String guid, AtlasClassification classification) { + boolean ret = false; + + if (MapUtils.isNotEmpty(propagationsMap) && propagationsMap.containsKey(guid) && CollectionUtils.isNotEmpty(propagationsMap.get(guid))) { + List<AtlasClassification> classifications = propagationsMap.get(guid); + String classificationName = classification.getTypeName(); + String entityGuid = classification.getEntityGuid(); + + for (AtlasClassification c : classifications) { + if (StringUtils.equals(c.getTypeName(), classificationName) && StringUtils.equals(c.getEntityGuid(), entityGuid)) { + ret = true; + break; + } + } + } + + return ret; + } + private Map<String, Object> pruneEntityAttributesForAudit(AtlasEntity entity) { Map<String, Object> ret = null; Map<String, Object> entityAttributes = entity.getAttributes(); http://git-wip-us.apache.org/repos/asf/atlas/blob/2237a895/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java index c5967b2..e55864b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java @@ -27,9 +27,6 @@ import org.apache.atlas.model.audit.EntityAuditEventV2; import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.ha.HAConfiguration; -import org.apache.atlas.listener.ActiveStateChangeHandler; -import org.apache.atlas.service.Service; -import org.apache.commons.collections.CollectionUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -57,11 +54,8 @@ import javax.inject.Singleton; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; /** * HBase based repository for entity audit events @@ -110,9 +104,12 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito try { table = connection.getTable(tableName); List<Put> puts = new ArrayList<>(events.size()); - for (EntityAuditEvent event : events) { + + for (int index = 0; index < events.size(); index++) { + EntityAuditEvent event = events.get(index); + LOG.debug("Adding entity audit event {}", event); - Put put = new Put(getKey(event.getEntityId(), event.getTimestamp())); + Put put = new Put(getKey(event.getEntityId(), event.getTimestamp(), index)); addColumn(put, COLUMN_ACTION, event.getAction()); addColumn(put, COLUMN_USER, event.getUser()); addColumn(put, COLUMN_DETAIL, event.getDetails()); @@ -141,12 +138,14 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito table = connection.getTable(tableName); List<Put> puts = new ArrayList<>(events.size()); - for (EntityAuditEventV2 event : events) { + for (int index = 0; index < events.size(); index++) { + EntityAuditEventV2 event = events.get(index); + if (LOG.isDebugEnabled()) { LOG.debug("Adding entity audit event {}", event); } - Put put = new Put(getKey(event.getEntityId(), event.getTimestamp())); + Put put = new Put(getKey(event.getEntityId(), event.getTimestamp(), index)); addColumn(put, COLUMN_ACTION, event.getAction()); addColumn(put, COLUMN_USER, event.getUser()); @@ -197,7 +196,7 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito if (StringUtils.isEmpty(startKey)) { //Set start row to entity id + max long value - byte[] entityBytes = getKey(entityId, Long.MAX_VALUE); + byte[] entityBytes = getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE); scan = scan.setStartRow(entityBytes); } else { scan = scan.setStartRow(Bytes.toBytes(startKey)); @@ -287,7 +286,7 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito .setSmall(true); if (StringUtils.isEmpty(startKey)) { //Set start row to entity id + max long value - byte[] entityBytes = getKey(entityId, Long.MAX_VALUE); + byte[] entityBytes = getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE); scan = scan.setStartRow(entityBytes); } else { scan = scan.setStartRow(Bytes.toBytes(startKey)); http://git-wip-us.apache.org/repos/asf/atlas/blob/2237a895/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java index 20147a0..eb58b5e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java +++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java @@ -375,6 +375,12 @@ public class AtlasInstanceConverter { return EntityAuditEvent.EntityAuditAction.TAG_DELETE; case CLASSIFICATION_UPDATE: return EntityAuditEvent.EntityAuditAction.TAG_UPDATE; + case PROPAGATED_CLASSIFICATION_ADD: + return EntityAuditEvent.EntityAuditAction.PROPAGATED_TAG_ADD; + case PROPAGATED_CLASSIFICATION_DELETE: + return EntityAuditEvent.EntityAuditAction.PROPAGATED_TAG_DELETE; + case PROPAGATED_CLASSIFICATION_UPDATE: + return EntityAuditEvent.EntityAuditAction.PROPAGATED_TAG_UPDATE; } return null; @@ -400,6 +406,12 @@ public class AtlasInstanceConverter { return EntityAuditEventV2.EntityAuditAction.CLASSIFICATION_DELETE; case TAG_UPDATE: return EntityAuditEventV2.EntityAuditAction.CLASSIFICATION_UPDATE; + case PROPAGATED_TAG_ADD: + return EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_ADD; + case PROPAGATED_TAG_DELETE: + return EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_DELETE; + case PROPAGATED_TAG_UPDATE: + return EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_UPDATE; } return null; http://git-wip-us.apache.org/repos/asf/atlas/blob/2237a895/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java index 9aebde2..46b17c0 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java @@ -20,9 +20,11 @@ package org.apache.atlas.repository.store.graph.v1; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; +import org.apache.atlas.RequestContextV1; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.listener.EntityChangeListenerV2; +import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; @@ -41,6 +43,7 @@ import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.util.AtlasRepositoryConfiguration; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,9 +53,12 @@ import javax.inject.Inject; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_ADD; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_DELETE; import static org.apache.atlas.util.AtlasRepositoryConfiguration.isV2EntityNotificationEnabled; @@ -100,6 +106,8 @@ public class AtlasEntityChangeNotifier { notifyListeners(updatedEntities, EntityOperation.UPDATE, isImport); notifyListeners(partiallyUpdatedEntities, EntityOperation.PARTIAL_UPDATE, isImport); notifyListeners(deletedEntities, EntityOperation.DELETE, isImport); + + notifyPropagatedEntities(); } public void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException { @@ -156,25 +164,26 @@ public class AtlasEntityChangeNotifier { } } - public void onClassificationDeletedFromEntity(AtlasEntity entity, List<String> deletedClassificationNames) throws AtlasBaseException { + public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException { if (isV2EntityNotificationEnabled()) { doFullTextMapping(entity.getGuid()); for (EntityChangeListenerV2 listener : entityChangeListenersV2) { - listener.onClassificationsDeleted(entity, deletedClassificationNames); + listener.onClassificationsDeleted(entity, deletedClassifications); } } else { doFullTextMapping(entity.getGuid()); Referenceable entityRef = toReferenceable(entity.getGuid()); + List<Struct> traits = toStruct(deletedClassifications); - if (entityRef == null || CollectionUtils.isEmpty(deletedClassificationNames)) { + if (entityRef == null || CollectionUtils.isEmpty(deletedClassifications)) { return; } for (EntityChangeListener listener : entityChangeListeners) { try { - listener.onTraitsDeleted(entityRef, deletedClassificationNames); + listener.onTraitsDeleted(entityRef, traits); } catch (AtlasException e) { throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitDelete"); } @@ -183,6 +192,36 @@ public class AtlasEntityChangeNotifier { } } + public void notifyPropagatedEntities() throws AtlasBaseException { + RequestContextV1 context = RequestContextV1.get(); + Map<String, List<AtlasClassification>> addedPropagations = context.getAddedPropagations(); + Map<String, List<AtlasClassification>> removedPropagations = context.getRemovedPropagations(); + + notifyPropagatedEntities(addedPropagations, PROPAGATED_CLASSIFICATION_ADD); + notifyPropagatedEntities(removedPropagations, PROPAGATED_CLASSIFICATION_DELETE); + } + + private void notifyPropagatedEntities(Map<String, List<AtlasClassification>> entityPropagationMap, EntityAuditAction action) throws AtlasBaseException { + if (MapUtils.isEmpty(entityPropagationMap) || action == null) { + return; + } + + for (String guid : entityPropagationMap.keySet()) { + AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(guid); + AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; + + if (entity == null) { + continue; + } + + if (action == PROPAGATED_CLASSIFICATION_ADD) { + onClassificationAddedToEntity(entity, entityPropagationMap.get(guid)); + } else if (action == PROPAGATED_CLASSIFICATION_DELETE) { + onClassificationDeletedFromEntity(entity, entityPropagationMap.get(guid)); + } + } + } + private String getListenerName(EntityChangeListener listener) { return listener.getClass().getSimpleName(); } http://git-wip-us.apache.org/repos/asf/atlas/blob/2237a895/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java index 51dcc3a..ab15d95 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java @@ -19,6 +19,7 @@ package org.apache.atlas.repository.store.graph.v1; import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContextV1; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; @@ -80,16 +81,18 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore { private static final Long DEFAULT_RELATIONSHIP_VERSION = 0L; - private final AtlasTypeRegistry typeRegistry; - private final EntityGraphRetriever entityRetriever; - private final DeleteHandlerV1 deleteHandler; - private final GraphHelper graphHelper = GraphHelper.getInstance(); + private final AtlasTypeRegistry typeRegistry; + private final EntityGraphRetriever entityRetriever; + private final DeleteHandlerV1 deleteHandler; + private final GraphHelper graphHelper = GraphHelper.getInstance(); + private final AtlasEntityChangeNotifier entityChangeNotifier; @Inject - public AtlasRelationshipStoreV1(AtlasTypeRegistry typeRegistry, DeleteHandlerV1 deleteHandler) { - this.typeRegistry = typeRegistry; - this.entityRetriever = new EntityGraphRetriever(typeRegistry); - this.deleteHandler = deleteHandler; + public AtlasRelationshipStoreV1(AtlasTypeRegistry typeRegistry, DeleteHandlerV1 deleteHandler, AtlasEntityChangeNotifier entityChangeNotifier) { + this.typeRegistry = typeRegistry; + this.entityRetriever = new EntityGraphRetriever(typeRegistry); + this.deleteHandler = deleteHandler; + this.entityChangeNotifier = entityChangeNotifier; } @Override @@ -112,6 +115,9 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore { LOG.debug("<== create({}): {}", relationship, ret); } + // notify entities for added/removed classification propagation + entityChangeNotifier.notifyPropagatedEntities(); + return ret; } @@ -181,6 +187,9 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore { AtlasRelationship ret = updateRelationship(edge, relationship); + // notify entities for added/removed classification propagation + entityChangeNotifier.notifyPropagatedEntities(); + if (LOG.isDebugEnabled()) { LOG.debug("<== update({}): {}", relationship, ret); } @@ -229,8 +238,20 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore { throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_ALREADY_DELETED, guid); } + // remove tag propagations + List<AtlasVertex> propagatedClassificationVertices = getClassificationVertices(edge); + + for (AtlasVertex classificationVertex : propagatedClassificationVertices) { + List<AtlasVertex> removePropagationFromVertices = graphHelper.getPropagatedEntityVertices(classificationVertex); + + deleteHandler.removeTagPropagation(classificationVertex, removePropagationFromVertices); + } + deleteHandler.deleteRelationships(Collections.singleton(edge)); + // notify entities for added/removed classification propagation + entityChangeNotifier.notifyPropagatedEntities(); + if (LOG.isDebugEnabled()) { LOG.debug("<== deleteById({}): {}", guid); } @@ -698,7 +719,7 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore { handleBlockedClassifications(ret, relationship.getBlockedPropagatedClassifications()); // propagate tags - entityRetriever.addTagPropagation(ret, tagPropagation); + deleteHandler.addTagPropagation(ret, tagPropagation); } return ret; http://git-wip-us.apache.org/repos/asf/atlas/blob/2237a895/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java index c0f2fc3..abaf509 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java @@ -23,8 +23,10 @@ import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContextV1; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; +import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.graph.AtlasEdgeLabel; @@ -50,18 +52,24 @@ import java.util.*; import static org.apache.atlas.model.instance.AtlasClassification.PropagationState.ACTIVE; import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; +import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO; import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL; import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY; import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX; import static org.apache.atlas.repository.graph.GraphHelper.addToPropagatedTraitNames; +import static org.apache.atlas.repository.graph.GraphHelper.getAssociatedEntityVertex; +import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge; import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdgeState; import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdges; import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid; import static org.apache.atlas.repository.graph.GraphHelper.getClassificationName; +import static org.apache.atlas.repository.graph.GraphHelper.getGuid; import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedClassificationEdge; import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedEdges; +import static org.apache.atlas.repository.graph.GraphHelper.getPropagationEnabledClassificationVertices; +import static org.apache.atlas.repository.graph.GraphHelper.getRelationshipGuid; import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames; import static org.apache.atlas.repository.graph.GraphHelper.getTypeName; import static org.apache.atlas.repository.graph.GraphHelper.isRelationshipEdge; @@ -326,39 +334,102 @@ public abstract class DeleteHandlerV1 { return !softDelete || forceDelete; } - public List<AtlasVertex> addTagPropagation(AtlasVertex classificationVertex, List<AtlasVertex> propagatedEntityVertices) { + public void addTagPropagation(AtlasEdge edge, PropagateTags propagateTags) throws AtlasBaseException { + if (edge == null) { + return; + } + + AtlasVertex outVertex = edge.getOutVertex(); + AtlasVertex inVertex = edge.getInVertex(); + + if (propagateTags == ONE_TO_TWO || propagateTags == PropagateTags.BOTH) { + addTagPropagation(outVertex, inVertex, edge); + } + + if (propagateTags == PropagateTags.TWO_TO_ONE || propagateTags == PropagateTags.BOTH) { + addTagPropagation(inVertex, outVertex, edge); + } + } + + private void addTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasEdge edge) throws AtlasBaseException { + final List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(fromVertex); + final List<AtlasVertex> propagatedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? graphHelper.getIncludedImpactedVerticesWithReferences(toVertex, getRelationshipGuid(edge)) : null; + + if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Propagate {} tags: from {} entity to {} entities", classificationVertices.size(), getTypeName(fromVertex), propagatedEntityVertices.size()); + } + + for (AtlasVertex classificationVertex : classificationVertices) { + addTagPropagation(classificationVertex, propagatedEntityVertices); + } + } + } + + public List<AtlasVertex> addTagPropagation(AtlasVertex classificationVertex, List<AtlasVertex> propagatedEntityVertices) throws AtlasBaseException { List<AtlasVertex> ret = null; if (CollectionUtils.isNotEmpty(propagatedEntityVertices) && classificationVertex != null) { - String classificationName = getTypeName(classificationVertex); - AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classificationName); + String classificationName = getTypeName(classificationVertex); + AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classificationName); + AtlasVertex associatedEntityVertex = getAssociatedEntityVertex(classificationVertex); for (AtlasVertex propagatedEntityVertex : propagatedEntityVertices) { - AtlasEdge existingEdge = getPropagatedClassificationEdge(propagatedEntityVertex, classificationVertex); + if (getClassificationEdge(propagatedEntityVertex, classificationVertex) != null) { + if (LOG.isDebugEnabled()) { + LOG.debug(" --> Classification edge already exists from [{}] --> [{}][{}] using edge label: [{}]", + getTypeName(propagatedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), classificationName); + } + + continue; + } else if (getPropagatedClassificationEdge(propagatedEntityVertex, classificationVertex) != null) { + if (LOG.isDebugEnabled()) { + LOG.debug(" --> Propagated classification edge already exists from [{}] --> [{}][{}] using edge label: [{}]", + getTypeName(propagatedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL); + } - if (existingEdge != null) { continue; } String entityTypeName = getTypeName(propagatedEntityVertex); AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName); + String entityGuid = getGuid(propagatedEntityVertex); - if (classificationType.canApplyToEntityType(entityType)) { + if (!classificationType.canApplyToEntityType(entityType)) { if (LOG.isDebugEnabled()) { - LOG.debug(" --> Adding propagated classification: [{}] to {} ({}) using edge label: [{}]", classificationName, getTypeName(propagatedEntityVertex), - GraphHelper.getGuid(propagatedEntityVertex), CLASSIFICATION_LABEL); + LOG.debug(" --> Not creating propagated classification edge from [{}] --> [{}][{}], classification is not applicable for entity type", + getTypeName(propagatedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex)); } - if (ret == null) { - ret = new ArrayList<>(); - } + continue; + } - ret.add(propagatedEntityVertex); + AtlasEdge existingEdge = getPropagatedClassificationEdge(propagatedEntityVertex, classificationVertex); - graphHelper.addClassificationEdge(propagatedEntityVertex, classificationVertex, true); + if (existingEdge != null) { + continue; + } - addToPropagatedTraitNames(propagatedEntityVertex, classificationName); + if (LOG.isDebugEnabled()) { + LOG.debug(" --> Adding propagated classification: [{}] to {} ({}) using edge label: [{}]", classificationName, getTypeName(propagatedEntityVertex), + GraphHelper.getGuid(propagatedEntityVertex), CLASSIFICATION_LABEL); } + + if (ret == null) { + ret = new ArrayList<>(); + } + + ret.add(propagatedEntityVertex); + + graphHelper.addClassificationEdge(propagatedEntityVertex, classificationVertex, true); + + addToPropagatedTraitNames(propagatedEntityVertex, classificationName); + + // record add propagation details to send notifications at the end + RequestContextV1 context = RequestContextV1.get(); + AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex); + + context.recordAddedPropagation(entityGuid, classification); } } @@ -372,10 +443,17 @@ public abstract class DeleteHandlerV1 { List<AtlasEdge> propagatedEdges = getPropagatedEdges(classificationVertex); if (CollectionUtils.isNotEmpty(propagatedEdges)) { + AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex); + for (AtlasEdge propagatedEdge : propagatedEdges) { - deletePropagatedEdge(propagatedEdge); + AtlasVertex entityVertex = propagatedEdge.getOutVertex(); + + ret.add(entityVertex); - ret.add(propagatedEdge.getOutVertex()); + // record remove propagation details to send notifications at the end + RequestContextV1.get().recordRemovedPropagation(getGuid(entityVertex), classification); + + deletePropagatedEdge(propagatedEdge); } } } @@ -385,14 +463,90 @@ public abstract class DeleteHandlerV1 { public void removeTagPropagation(AtlasVertex classificationVertex, List<AtlasVertex> entityVertices) throws AtlasBaseException { if (classificationVertex != null && CollectionUtils.isNotEmpty(entityVertices)) { - String classificationName = getClassificationName(classificationVertex); - String entityGuid = getClassificationEntityGuid(classificationVertex); + String classificationName = getClassificationName(classificationVertex); + AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex); + String entityGuid = getClassificationEntityGuid(classificationVertex); + RequestContextV1 context = RequestContextV1.get(); for (AtlasVertex entityVertex : entityVertices) { AtlasEdge propagatedEdge = getPropagatedClassificationEdge(entityVertex, classificationName, entityGuid); if (propagatedEdge != null) { deletePropagatedEdge(propagatedEdge); + + // record remove propagation details to send notifications at the end + context.recordRemovedPropagation(getGuid(entityVertex), classification); + } + } + } + } + + public void removeTagPropagation(AtlasEdge edge, PropagateTags propagateTags) throws AtlasBaseException { + if (edge == null) { + return; + } + + AtlasVertex outVertex = edge.getOutVertex(); + AtlasVertex inVertex = edge.getInVertex(); + + if (propagateTags == ONE_TO_TWO || propagateTags == PropagateTags.BOTH) { + removeTagPropagation(outVertex, inVertex, edge); + } + + if (propagateTags == PropagateTags.TWO_TO_ONE || propagateTags == PropagateTags.BOTH) { + removeTagPropagation(inVertex, outVertex, edge); + } + } + + private void removeTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasEdge edge) throws AtlasBaseException { + final List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(fromVertex); + final List<AtlasVertex> impactedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? graphHelper.getIncludedImpactedVerticesWithReferences(toVertex, getRelationshipGuid(edge)) : null; + + if (CollectionUtils.isNotEmpty(impactedEntityVertices)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Removing {} propagated tags: for {} from {} entities", classificationVertices.size(), getTypeName(fromVertex), impactedEntityVertices.size()); + } + + for (AtlasVertex classificationVertex : classificationVertices) { + String classificationName = getTypeName(classificationVertex); + AtlasVertex associatedEntityVertex = getAssociatedEntityVertex(classificationVertex); + List<AtlasVertex> referrals = graphHelper.getIncludedImpactedVerticesWithReferences(associatedEntityVertex, getRelationshipGuid(edge)); + + for (AtlasVertex impactedEntityVertex : impactedEntityVertices) { + if (referrals.contains(impactedEntityVertex)) { + if (LOG.isDebugEnabled()) { + if (org.apache.commons.lang3.StringUtils.equals(getGuid(impactedEntityVertex), getGuid(associatedEntityVertex))) { + LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is associated with [{}]", + getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL, classificationName, getTypeName(associatedEntityVertex)); + } else { + LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is propagated through other path", + getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL, classificationName); + } + } + + continue; + } + + // remove propagated classification edge and classificationName from propagatedTraitNames vertex property + AtlasEdge propagatedEdge = getPropagatedClassificationEdge(impactedEntityVertex, classificationVertex); + + if (propagatedEdge != null) { + if (LOG.isDebugEnabled()) { + LOG.debug(" --> Removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}]", + getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL); + } + + graphHelper.removeEdge(propagatedEdge); + + if (getClassificationEdgeState(propagatedEdge) == ACTIVE) { + removeFromPropagatedTraitNames(impactedEntityVertex, classificationName); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] using edge label: [{}], since edge doesn't exist", + getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL); + } + } } } } http://git-wip-us.apache.org/repos/asf/atlas/blob/2237a895/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java index 43e67df..c3b3cdb 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java @@ -1462,34 +1462,47 @@ public class EntityGraphMapper { validateClassificationExists(traitNames, classificationNames); - Map<AtlasVertex, List<String>> removedClassifications = new HashMap<>(); + Map<AtlasVertex, List<AtlasClassification>> removedClassifications = new HashMap<>(); for (String classificationName : classificationNames) { - AtlasVertex classificationVertex = getClassificationVertex(entityVertex, classificationName); + AtlasVertex classificationVertex = getClassificationVertex(entityVertex, classificationName); + AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex); // remove classification from propagated entities if propagation is turned on if (isPropagationEnabled(classificationVertex)) { - List<AtlasVertex> impactedVertices = deleteHandler.removeTagPropagation(classificationVertex); + List<AtlasVertex> propagatedEntityVertices = deleteHandler.removeTagPropagation(classificationVertex); - if (CollectionUtils.isNotEmpty(impactedVertices)) { - for (AtlasVertex impactedVertex : impactedVertices) { - List<String> classifications = removedClassifications.get(impactedVertex); + // add propagated entities and deleted classification details to removeClassifications map + if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) { + for (AtlasVertex propagatedEntityVertex : propagatedEntityVertices) { + List<AtlasClassification> classifications = removedClassifications.get(propagatedEntityVertex); if (classifications == null) { classifications = new ArrayList<>(); - removedClassifications.put(impactedVertex, classifications); + removedClassifications.put(propagatedEntityVertex, classifications); } - classifications.add(classificationName); + classifications.add(classification); } } } + // add associated entity and deleted classification details to removeClassifications map + List<AtlasClassification> classifications = removedClassifications.get(entityVertex); + + if (classifications == null) { + classifications = new ArrayList<>(); + + removedClassifications.put(entityVertex, classifications); + } + + classifications.add(classification); + // remove classifications from associated entity if (LOG.isDebugEnabled()) { LOG.debug("Removing classification: [{}] from: [{}][{}] with edge label: [{}]", classificationName, - getTypeName(entityVertex), entityGuid, CLASSIFICATION_LABEL); + getTypeName(entityVertex), entityGuid, CLASSIFICATION_LABEL); } AtlasEdge edge = getClassificationEdge(entityVertex, classificationVertex); @@ -1499,17 +1512,15 @@ public class EntityGraphMapper { traitNames.remove(classificationName); } - removedClassifications.put(entityVertex, classificationNames); - updateTraitNamesProperty(entityVertex, traitNames); updateModificationMetadata(entityVertex); - for (Map.Entry<AtlasVertex, List<String>> entry : removedClassifications.entrySet()) { - String guid = GraphHelper.getGuid(entry.getKey()); - List<String> deletedClassificationNames = entry.getValue(); - AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(guid); - AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null; + for (Map.Entry<AtlasVertex, List<AtlasClassification>> entry : removedClassifications.entrySet()) { + String guid = GraphHelper.getGuid(entry.getKey()); + List<AtlasClassification> deletedClassificationNames = entry.getValue(); + AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(guid); + AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null; entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassificationNames); } @@ -1532,12 +1543,16 @@ public class EntityGraphMapper { List<AtlasVertex> entitiesToPropagateTo = new ArrayList<>(); Map<AtlasVertex, List<AtlasClassification>> addedPropagations = null; - Map<AtlasVertex, List<String>> removedPropagations = null; + Map<AtlasVertex, List<AtlasClassification>> removedPropagations = null; for (AtlasClassification classification : classifications) { String classificationName = classification.getTypeName(); String classificationEntityGuid = classification.getEntityGuid(); + if (StringUtils.isEmpty(classificationEntityGuid)) { + classification.setEntityGuid(guid); + } + if (StringUtils.isNotEmpty(classificationEntityGuid) && !StringUtils.equalsIgnoreCase(guid, classificationEntityGuid)) { throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_UPDATE_FROM_PROPAGATED_ENTITY, classificationName); } @@ -1597,7 +1612,7 @@ public class EntityGraphMapper { if (updatedTagPropagation != null && currentTagPropagation != updatedTagPropagation) { if (updatedTagPropagation) { if (CollectionUtils.isEmpty(entitiesToPropagateTo)) { - entitiesToPropagateTo = graphHelper.getImpactedVertices(guid); + entitiesToPropagateTo = graphHelper.getImpactedVerticesWithRestrictions(guid, classificationVertex.getIdForDisplay()); } if (CollectionUtils.isNotEmpty(entitiesToPropagateTo)) { @@ -1625,7 +1640,7 @@ public class EntityGraphMapper { removedPropagations = new HashMap<>(); for (AtlasVertex impactedVertex : impactedVertices) { - List<String> removedClassifications = removedPropagations.get(impactedVertex); + List<AtlasClassification> removedClassifications = removedPropagations.get(impactedVertex); if (removedClassifications == null) { removedClassifications = new ArrayList<>(); @@ -1633,7 +1648,7 @@ public class EntityGraphMapper { removedPropagations.put(impactedVertex, removedClassifications); } - removedClassifications.add(classification.getTypeName()); + removedClassifications.add(classification); } } } @@ -1651,21 +1666,20 @@ public class EntityGraphMapper { } for (AtlasVertex vertex : notificationVertices) { - String entityGuid = GraphHelper.getGuid(vertex); - AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid); - AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null; - List<AtlasClassification> updatedClassificationList = StringUtils.equals(entityGuid, guid) ? updatedClassifications : Collections.emptyList(); + String entityGuid = GraphHelper.getGuid(vertex); + AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid); + AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null; - entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassificationList); + entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassifications); } if (removedPropagations != null) { - for (Map.Entry<AtlasVertex, List<String>> entry : removedPropagations.entrySet()) { - AtlasVertex vertex = entry.getKey(); - List<String> removedClassifications = entry.getValue(); - String entityGuid = GraphHelper.getGuid(vertex); - AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid); - AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null; + for (Map.Entry<AtlasVertex, List<AtlasClassification>> entry : removedPropagations.entrySet()) { + AtlasVertex vertex = entry.getKey(); + List<AtlasClassification> removedClassifications = entry.getValue(); + String entityGuid = GraphHelper.getGuid(vertex); + AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid); + AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null; entityChangeNotifier.onClassificationDeletedFromEntity(entity, removedClassifications); } @@ -1701,11 +1715,14 @@ public class EntityGraphMapper { AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid); AtlasEntity entity = (entityWithExtInfo != null) ? entityWithExtInfo.getEntity() : null; + AtlasClassification classification; if (updatedState == PropagationState.DELETED) { - entityChangeNotifier.onClassificationDeletedFromEntity(entity, Collections.singletonList(classificationName)); + classification = entityRetriever.toAtlasClassification(getClassificationVertex(entityVertex, classificationName)); + + entityChangeNotifier.onClassificationDeletedFromEntity(entity, Collections.singletonList(classification)); } else { - AtlasClassification classification = entityRetriever.toAtlasClassification(propagatedEdge.getInVertex()); + classification = entityRetriever.toAtlasClassification(propagatedEdge.getInVertex()); entityChangeNotifier.onClassificationAddedToEntity(entity, Collections.singletonList(classification)); } http://git-wip-us.apache.org/repos/asf/atlas/blob/2237a895/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java index 7d5b7fb..bd74a7a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java @@ -1049,154 +1049,4 @@ public final class EntityGraphRetriever { relationship.setAttribute(attribute.getName(), attrValue); } } - - public void addTagPropagation(AtlasEdge edge, PropagateTags propagateTags) throws AtlasBaseException { - if (edge == null) { - return; - } - - AtlasVertex outVertex = edge.getOutVertex(); - AtlasVertex inVertex = edge.getInVertex(); - - if (propagateTags == ONE_TO_TWO || propagateTags == PropagateTags.BOTH) { - addTagPropagation(outVertex, inVertex, edge); - } - - if (propagateTags == PropagateTags.TWO_TO_ONE || propagateTags == PropagateTags.BOTH) { - addTagPropagation(inVertex, outVertex, edge); - } - } - - public void removeTagPropagation(AtlasEdge edge, PropagateTags propagateTags) throws AtlasBaseException { - if (edge == null) { - return; - } - - AtlasVertex outVertex = edge.getOutVertex(); - AtlasVertex inVertex = edge.getInVertex(); - - if (propagateTags == ONE_TO_TWO || propagateTags == PropagateTags.BOTH) { - removeTagPropagation(outVertex, inVertex, edge); - } - - if (propagateTags == PropagateTags.TWO_TO_ONE || propagateTags == PropagateTags.BOTH) { - removeTagPropagation(inVertex, outVertex, edge); - } - } - - private void addTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasEdge edge) throws AtlasBaseException { - final List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(fromVertex); - final List<AtlasVertex> impactedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? graphHelper.getIncludedImpactedVerticesWithReferences(toVertex, getRelationshipGuid(edge)) : null; - - if (CollectionUtils.isNotEmpty(impactedEntityVertices)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Propagate {} tags: from {} entity to {} entities", classificationVertices.size(), getTypeName(fromVertex), impactedEntityVertices.size()); - } - - for (AtlasVertex classificationVertex : classificationVertices) { - String classificationName = getTypeName(classificationVertex); - AtlasVertex associatedEntityVertex = getAssociatedEntityVertex(classificationVertex); - AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classificationName); - - for (AtlasVertex impactedEntityVertex : impactedEntityVertices) { - if (getClassificationEdge(impactedEntityVertex, classificationVertex) != null) { - if (LOG.isDebugEnabled()) { - LOG.debug(" --> Classification edge already exists from [{}] --> [{}][{}] using edge label: [{}]", - getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), classificationName); - } - - continue; - } else if (getPropagatedClassificationEdge(impactedEntityVertex, classificationVertex) != null) { - if (LOG.isDebugEnabled()) { - LOG.debug(" --> Propagated classification edge already exists from [{}] --> [{}][{}] using edge label: [{}]", - getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL); - } - - continue; - } - - String entityTypeName = getTypeName(impactedEntityVertex); - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName); - - if (!classificationType.canApplyToEntityType(entityType)) { - if (LOG.isDebugEnabled()) { - LOG.debug(" --> Not creating propagated classification edge from [{}] --> [{}][{}], classification is not applicable for entity type", - getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex)); - } - - continue; - } - - if (LOG.isDebugEnabled()) { - LOG.debug(" --> Creating propagated classification edge from [{}] --> [{}][{}] using edge label: [{}]", - getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL); - } - - AtlasEdge existingEdge = getPropagatedClassificationEdge(impactedEntityVertex, classificationVertex); - - if (existingEdge != null) { - continue; - } - - graphHelper.addClassificationEdge(impactedEntityVertex, classificationVertex, true); - - addToPropagatedTraitNames(impactedEntityVertex, classificationName); - } - } - } - } - - private void removeTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasEdge edge) throws AtlasBaseException { - final List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(fromVertex); - final List<AtlasVertex> impactedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? graphHelper.getIncludedImpactedVerticesWithReferences(toVertex, getRelationshipGuid(edge)) : null; - - if (CollectionUtils.isNotEmpty(impactedEntityVertices)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Removing {} propagated tags: for {} from {} entities", classificationVertices.size(), getTypeName(fromVertex), impactedEntityVertices.size()); - } - - for (AtlasVertex classificationVertex : classificationVertices) { - String classificationName = getTypeName(classificationVertex); - AtlasVertex associatedEntityVertex = getAssociatedEntityVertex(classificationVertex); - List<AtlasVertex> referrals = graphHelper.getIncludedImpactedVerticesWithReferences(associatedEntityVertex, getRelationshipGuid(edge)); - - for (AtlasVertex impactedEntityVertex : impactedEntityVertices) { - if (referrals.contains(impactedEntityVertex)) { - if (LOG.isDebugEnabled()) { - if (StringUtils.equals(getGuid(impactedEntityVertex), getGuid(associatedEntityVertex))) { - LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is associated with [{}]", - getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL, classificationName, getTypeName(associatedEntityVertex)); - } else { - LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is propagated through other path", - getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL, classificationName); - } - } - - continue; - } - - // remove propagated classification edge and classificationName from propagatedTraitNames vertex property - AtlasEdge propagatedEdge = getPropagatedClassificationEdge(impactedEntityVertex, classificationVertex); - - if (propagatedEdge != null) { - if (LOG.isDebugEnabled()) { - LOG.debug(" --> Removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}]", - getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL); - } - - graphHelper.removeEdge(propagatedEdge); - - if (getClassificationEdgeState(propagatedEdge) == ACTIVE) { - removeFromPropagatedTraitNames(impactedEntityVertex, classificationName); - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] using edge label: [{}], since edge doesn't exist", - getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL); - } - } - } - } - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/2237a895/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java index 8fc0327..590fc47 100644 --- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java +++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java @@ -82,6 +82,9 @@ public abstract class AtlasRelationshipStoreV1Test { @Inject EntityGraphMapper graphMapper; + @Inject + AtlasEntityChangeNotifier entityNotifier; + AtlasEntityStore entityStore; AtlasRelationshipStore relationshipStore; AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class); @@ -116,7 +119,7 @@ public abstract class AtlasRelationshipStoreV1Test { @BeforeTest public void init() throws Exception { entityStore = new AtlasEntityStoreV1(deleteHandler, typeRegistry, mockChangeNotifier, graphMapper); - relationshipStore = new AtlasRelationshipStoreV1(typeRegistry, deleteHandler); + relationshipStore = new AtlasRelationshipStoreV1(typeRegistry, deleteHandler, entityNotifier); RequestContextV1.clear(); RequestContextV1.get().setUser(TestUtilsV2.TEST_USER, null); http://git-wip-us.apache.org/repos/asf/atlas/blob/2237a895/server-api/src/main/java/org/apache/atlas/RequestContextV1.java ---------------------------------------------------------------------- diff --git a/server-api/src/main/java/org/apache/atlas/RequestContextV1.java b/server-api/src/main/java/org/apache/atlas/RequestContextV1.java index b7c5686..0fc15e8 100644 --- a/server-api/src/main/java/org/apache/atlas/RequestContextV1.java +++ b/server-api/src/main/java/org/apache/atlas/RequestContextV1.java @@ -18,8 +18,10 @@ package org.apache.atlas; +import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,10 +32,12 @@ public class RequestContextV1 { private static final ThreadLocal<RequestContextV1> CURRENT_CONTEXT = new ThreadLocal<>(); - private final Map<String, AtlasObjectId> updatedEntities = new HashMap<>(); - private final Map<String, AtlasObjectId> deletedEntities = new HashMap<>(); - private final Map<String, AtlasEntityWithExtInfo> entityCacheV2 = new HashMap<>(); - private final long requestTime = System.currentTimeMillis(); + private final Map<String, AtlasObjectId> updatedEntities = new HashMap<>(); + private final Map<String, AtlasObjectId> deletedEntities = new HashMap<>(); + private final Map<String, AtlasEntityWithExtInfo> entityCacheV2 = new HashMap<>(); + private final Map<String, List<AtlasClassification>> addedPropagations = new HashMap<>(); + private final Map<String, List<AtlasClassification>> removedPropagations = new HashMap<>(); + private final long requestTime = System.currentTimeMillis(); private String user; private Set<String> userGroups; @@ -63,6 +67,8 @@ public class RequestContextV1 { instance.updatedEntities.clear(); instance.deletedEntities.clear(); instance.entityCacheV2.clear(); + instance.addedPropagations.clear(); + instance.removedPropagations.clear(); } CURRENT_CONTEXT.remove(); @@ -101,6 +107,42 @@ public class RequestContextV1 { } } + public void recordAddedPropagation(String guid, AtlasClassification classification) { + if (StringUtils.isNotEmpty(guid) && classification != null) { + List<AtlasClassification> classifications = addedPropagations.get(guid); + + if (classifications == null) { + classifications = new ArrayList<>(); + } + + classifications.add(classification); + + addedPropagations.put(guid, classifications); + } + } + + public void recordRemovedPropagation(String guid, AtlasClassification classification) { + if (StringUtils.isNotEmpty(guid) && classification != null) { + List<AtlasClassification> classifications = removedPropagations.get(guid); + + if (classifications == null) { + classifications = new ArrayList<>(); + } + + classifications.add(classification); + + removedPropagations.put(guid, classifications); + } + } + + public Map<String, List<AtlasClassification>> getAddedPropagations() { + return addedPropagations; + } + + public Map<String, List<AtlasClassification>> getRemovedPropagations() { + return removedPropagations; + } + /** * Adds the specified instance to the cache * http://git-wip-us.apache.org/repos/asf/atlas/blob/2237a895/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java ---------------------------------------------------------------------- diff --git a/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java b/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java index 19fae4f..ab379e0 100644 --- a/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java +++ b/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java @@ -62,11 +62,11 @@ public interface EntityChangeListener { * This is upon deleting a trait from a typed instance. * * @param entity the entity - * @param traitNames trait name for the instance that needs to be deleted from entity + * @param traits trait that needs to be added to entity * * @throws AtlasException if the listener notification fails */ - void onTraitsDeleted(Referenceable entity, Collection<String> traitNames) throws AtlasException; + void onTraitsDeleted(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException; /** * This is upon updating a trait from a typed instance. http://git-wip-us.apache.org/repos/asf/atlas/blob/2237a895/tools/atlas-migration-exporter/src/main/java/org/apache/atlas/migration/NoOpNotificationChangeListener.java ---------------------------------------------------------------------- diff --git a/tools/atlas-migration-exporter/src/main/java/org/apache/atlas/migration/NoOpNotificationChangeListener.java b/tools/atlas-migration-exporter/src/main/java/org/apache/atlas/migration/NoOpNotificationChangeListener.java index 6c2bf22..9af7819 100644 --- a/tools/atlas-migration-exporter/src/main/java/org/apache/atlas/migration/NoOpNotificationChangeListener.java +++ b/tools/atlas-migration-exporter/src/main/java/org/apache/atlas/migration/NoOpNotificationChangeListener.java @@ -44,7 +44,7 @@ public class NoOpNotificationChangeListener implements EntityChangeListener { } @Override - public void onTraitsDeleted(Referenceable entity, Collection<String> traitNames) throws AtlasException { + public void onTraitsDeleted(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException { } http://git-wip-us.apache.org/repos/asf/atlas/blob/2237a895/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java index ac097b6..1fb032a 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java +++ b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java @@ -89,7 +89,7 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 { } @Override - public void onClassificationsDeleted(AtlasEntity entity, List<String> classificationNames) throws AtlasBaseException { + public void onClassificationsDeleted(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException { notifyEntityEvents(Collections.singletonList(entity), CLASSIFICATION_DELETE); } http://git-wip-us.apache.org/repos/asf/atlas/blob/2237a895/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java index a3e5949..dbdf67d 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java @@ -85,7 +85,7 @@ public class NotificationEntityChangeListener implements EntityChangeListener { } @Override - public void onTraitsDeleted(Referenceable entity, Collection<String> traitNames) throws AtlasException { + public void onTraitsDeleted(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException { notifyOfEntityEvent(Collections.singleton(entity), OperationType.TRAIT_DELETE); }
