This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 48b0566 ATLAS-3689: added entity-audit entries on business attributes
add/update/delete to an entity
48b0566 is described below
commit 48b056689bc19ac01f253898c37a7f444c0c6852
Author: Mandar Ambawane <[email protected]>
AuthorDate: Sun Apr 12 13:22:02 2020 +0530
ATLAS-3689: added entity-audit entries on business attributes
add/update/delete to an entity
Signed-off-by: Madhan Neethiraj <[email protected]>
(cherry picked from commit 80135a8dbedfbaff444267681aa13d958e867c7e)
---
dashboardv2/public/js/utils/Enums.js | 5 +-
.../js/views/audit/CreateAuditTableLayoutView.js | 8 +-
.../atlas/listener/EntityChangeListenerV2.java | 9 +++
.../atlas/model/audit/EntityAuditEventV2.java | 5 +-
.../repository/audit/EntityAuditListenerV2.java | 82 ++++++++++++-------
.../store/graph/v2/AtlasEntityChangeNotifier.java | 26 +++++-
.../store/graph/v2/EntityGraphMapper.java | 84 ++++++++++++++-----
.../store/graph/v2/IAtlasEntityChangeNotifier.java | 57 +++++++++++++
.../v2/bulkimport/EntityChangeNotifierNop.java | 94 ++++++++++++++++++++++
.../notification/EntityNotificationListenerV2.java | 5 ++
10 files changed, 317 insertions(+), 58 deletions(-)
diff --git a/dashboardv2/public/js/utils/Enums.js
b/dashboardv2/public/js/utils/Enums.js
index e2d8cd2..780dad2 100644
--- a/dashboardv2/public/js/utils/Enums.js
+++ b/dashboardv2/public/js/utils/Enums.js
@@ -39,8 +39,7 @@ define(['require'], function(require) {
LABEL_ADD: "Label(s) Added",
LABEL_DELETE: "Label(s) Deleted",
ENTITY_PURGE: "Entity Purged",
- BUSINESS_ATTRIBUTE_ADD: "Business Attribute(s) Added",
- BUSINESS_ATTRIBUTE_DELETE: "Business Attribute(s) Deleted"
+ BUSINESS_ATTRIBUTE_UPDATE: "Business Attribute(s) Updated"
}
Enums.entityStateReadOnly = {
@@ -214,4 +213,4 @@ define(['require'], function(require) {
1: "true"
};
return Enums;
-});
\ No newline at end of file
+});
diff --git a/dashboardv2/public/js/views/audit/CreateAuditTableLayoutView.js
b/dashboardv2/public/js/views/audit/CreateAuditTableLayoutView.js
index a820616..f7673ce 100644
--- a/dashboardv2/public/js/views/audit/CreateAuditTableLayoutView.js
+++ b/dashboardv2/public/js/views/audit/CreateAuditTableLayoutView.js
@@ -124,7 +124,11 @@ define(['require',
relationshipAttributes =
parseDetailsObject.relationshipAttributes;
if (attributesDetails) {
that.ui.attributeDetails.removeClass('hide');
- that.action.indexOf("Classification")
=== -1 ? that.ui.panelAttrHeading.html("Technical properties ") :
that.ui.panelAttrHeading.html("Properties ");
+ if
(that.action.includes("Classification") || that.action.includes("Business
Attribute") != -1) {
+
that.ui.panelAttrHeading.html("Properties ");
+ } else {
+
that.ui.panelAttrHeading.html("Technical properties ");
+ }
var attrTable =
that.createTableWithValues(attributesDetails);
that.ui.attributeCard.html(
attrTable);
@@ -174,4 +178,4 @@ define(['require',
}
});
return CreateAuditTableLayoutView;
-});
\ No newline at end of file
+});
diff --git
a/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
index 2394a12..374d0dd 100644
--- a/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
+++ b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
@@ -26,6 +26,7 @@ import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.AtlasRelationship;
import java.util.List;
+import java.util.Map;
import java.util.Set;
/**
@@ -173,4 +174,12 @@ public interface EntityChangeListenerV2 {
* @throws AtlasBaseException if the listener notification fails
*/
void onLabelsDeleted(AtlasEntity entity, Set<String> labels) throws
AtlasBaseException;
+
+ /**
+ *
+ * @param entity the entity
+ * @param updatedBusinessAttributes business metadata attribute
+ * @throws AtlasBaseException if the listener notification fails
+ */
+ void onBusinessAttributesUpdated(AtlasEntity entity, Map<String,
Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException;
}
\ No newline at end of file
diff --git
a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
index 9301e21..bcfdd94 100644
--- a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
+++ b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
@@ -50,7 +50,8 @@ public class EntityAuditEventV2 implements Serializable {
ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE,
CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE,
PROPAGATED_CLASSIFICATION_ADD, PROPAGATED_CLASSIFICATION_DELETE,
PROPAGATED_CLASSIFICATION_UPDATE,
- TERM_ADD, TERM_DELETE, LABEL_ADD, LABEL_DELETE, ENTITY_PURGE;
+ TERM_ADD, TERM_DELETE, LABEL_ADD, LABEL_DELETE, ENTITY_PURGE,
+ BUSINESS_ATTRIBUTE_UPDATE;
public static EntityAuditActionV2 fromString(String strValue) {
switch (strValue) {
@@ -91,6 +92,8 @@ public class EntityAuditEventV2 implements Serializable {
return LABEL_ADD;
case "LABEL_DELETE":
return LABEL_DELETE;
+ case "BUSINESS_ATTRIBUTE_UPDATE":
+ return BUSINESS_ATTRIBUTE_UPDATE;
}
throw new IllegalArgumentException("No enum constant " +
EntityAuditActionV2.class.getCanonicalName() + "." + strValue);
diff --git
a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
index cab4e1e..4c1e1a9 100644
---
a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
+++
b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
@@ -28,11 +28,13 @@ import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.utils.AtlasJson;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
@@ -50,6 +52,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static
org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.BUSINESS_ATTRIBUTE_UPDATE;
import static
org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_ADD;
import static
org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_DELETE;
import static
org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_UPDATE;
@@ -341,6 +344,57 @@ public class EntityAuditListenerV2 implements
EntityChangeListenerV2 {
}
}
+ @Override
+ public void onRelationshipsAdded(List<AtlasRelationship> relationships,
boolean isImport) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("New relationship(s) added to repository(" +
relationships.size() + ")");
+ }
+ }
+
+ @Override
+ public void onRelationshipsUpdated(List<AtlasRelationship> relationships,
boolean isImport) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Relationship(s) updated(" + relationships.size() + ")");
+ }
+ }
+
+ @Override
+ public void onRelationshipsDeleted(List<AtlasRelationship> relationships,
boolean isImport) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Relationship(s) deleted from repository(" +
relationships.size() + ")");
+ }
+ }
+
+ @Override
+ public void onRelationshipsPurged(List<AtlasRelationship> relationships)
throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Relationship(s) purged from repository(" +
relationships.size() + ")");
+ }
+ }
+
+ @Override
+ public void onBusinessAttributesUpdated(AtlasEntity entity, Map<String,
Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException {
+ if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
+ MetricRecorder metric =
RequestContext.get().startMetricRecord("entityAudit");
+
+ List<EntityAuditEventV2> auditEvents = new ArrayList<>();
+
+ for (Map.Entry<String, Map<String, Object>> entry :
updatedBusinessAttributes.entrySet()) {
+ String bmName = entry.getKey();
+ Map<String, Object> attributes = entry.getValue();
+ String details = AtlasJson.toJson(new
AtlasStruct(bmName, attributes));
+ EntityAuditEventV2 auditEvent = createEvent(entity,
BUSINESS_ATTRIBUTE_UPDATE, "Updated business attributes: " + details);
+
+ auditEvents.add(auditEvent);
+ }
+
+ auditRepository.putEventsV2(auditEvents);
+
+ RequestContext.get().endMetricRecord(metric);
+ }
+ }
+
+
private EntityAuditEventV2 createEvent(AtlasEntity entity,
EntityAuditActionV2 action, String details) {
return new EntityAuditEventV2(entity.getGuid(),
RequestContext.get().getRequestTime(),
RequestContext.get().getUser(), action,
details, entity);
@@ -566,32 +620,4 @@ public class EntityAuditListenerV2 implements
EntityChangeListenerV2 {
return ret;
}
-
- @Override
- public void onRelationshipsAdded(List<AtlasRelationship> relationships,
boolean isImport) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("New relationship(s) added to repository(" +
relationships.size() + ")");
- }
- }
-
- @Override
- public void onRelationshipsUpdated(List<AtlasRelationship> relationships,
boolean isImport) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Relationship(s) updated(" + relationships.size() + ")");
- }
- }
-
- @Override
- public void onRelationshipsDeleted(List<AtlasRelationship> relationships,
boolean isImport) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Relationship(s) deleted from repository(" +
relationships.size() + ")");
- }
- }
-
- @Override
- public void onRelationshipsPurged(List<AtlasRelationship> relationships)
throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Relationship(s) purged from repository(" +
relationships.size() + ")");
- }
- }
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
index d7020a7..0dc3193 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
@@ -66,7 +66,7 @@ import static
org.apache.atlas.repository.Constants.ENTITY_TEXT_PROPERTY_KEY;
@Component
-public class AtlasEntityChangeNotifier {
+public class AtlasEntityChangeNotifier implements IAtlasEntityChangeNotifier {
private static final Logger LOG =
LoggerFactory.getLogger(AtlasEntityChangeNotifier.class);
private final Set<EntityChangeListener> entityChangeListeners;
@@ -91,6 +91,7 @@ public class AtlasEntityChangeNotifier {
this.isV2EntityNotificationEnabled =
AtlasRepositoryConfiguration.isV2EntityNotificationEnabled();
}
+ @Override
public void onEntitiesMutated(EntityMutationResponse
entityMutationResponse, boolean isImport) throws AtlasBaseException {
if (CollectionUtils.isEmpty(entityChangeListeners)) {
return;
@@ -119,6 +120,7 @@ public class AtlasEntityChangeNotifier {
notifyPropagatedEntities();
}
+ @Override
public void notifyRelationshipMutation(AtlasRelationship relationship,
EntityNotification.EntityNotificationV2.OperationType operationType) throws
AtlasBaseException {
if (CollectionUtils.isEmpty(entityChangeListeners)) {
return;
@@ -137,6 +139,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onClassificationAddedToEntity(AtlasEntity entity,
List<AtlasClassification> addedClassifications) throws AtlasBaseException {
if (isV2EntityNotificationEnabled) {
doFullTextMapping(entity.getGuid());
@@ -166,6 +169,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onClassificationsAddedToEntities(List<AtlasEntity> entities,
List<AtlasClassification> addedClassifications) throws AtlasBaseException {
if (isV2EntityNotificationEnabled) {
doFullTextMappingHelper(entities);
@@ -201,6 +205,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onClassificationUpdatedToEntity(AtlasEntity entity,
List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
doFullTextMapping(entity.getGuid());
@@ -228,6 +233,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onClassificationDeletedFromEntity(AtlasEntity entity,
List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
doFullTextMapping(entity.getGuid());
@@ -255,6 +261,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onClassificationsDeletedFromEntities(List<AtlasEntity>
entities, List<AtlasClassification> deletedClassifications) throws
AtlasBaseException {
doFullTextMappingHelper(entities);
@@ -288,6 +295,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onTermAddedToEntities(AtlasGlossaryTerm term,
List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
// listeners notified on term-entity association only if v2
notifications are enabled
if (isV2EntityNotificationEnabled) {
@@ -307,6 +315,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onTermDeletedFromEntities(AtlasGlossaryTerm term,
List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
// listeners notified on term-entity disassociation only if v2
notifications are enabled
if (isV2EntityNotificationEnabled) {
@@ -326,6 +335,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onLabelsUpdatedFromEntity(String entityGuid, Set<String>
addedLabels, Set<String> deletedLabels) throws AtlasBaseException {
doFullTextMapping(entityGuid);
@@ -339,6 +349,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void notifyPropagatedEntities() throws AtlasBaseException {
RequestContext context =
RequestContext.get();
Map<String, List<AtlasClassification>> addedPropagations =
context.getAddedPropagations();
@@ -348,6 +359,18 @@ public class AtlasEntityChangeNotifier {
notifyPropagatedEntities(removedPropagations,
PROPAGATED_CLASSIFICATION_DELETE);
}
+ @Override
+ public void onBusinessAttributesUpdated(String entityGuid, Map<String,
Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException{
+ if (isV2EntityNotificationEnabled) {
+ AtlasEntity entity =
instanceConverter.getAndCacheEntity(entityGuid);
+
+ for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
+ listener.onBusinessAttributesUpdated(entity,
updatedBusinessAttributes);
+ }
+ }
+ }
+
+
private void notifyPropagatedEntities(Map<String,
List<AtlasClassification>> entityPropagationMap, EntityAuditActionV2 action)
throws AtlasBaseException {
if (MapUtils.isEmpty(entityPropagationMap) || action == null) {
return;
@@ -752,5 +775,4 @@ public class AtlasEntityChangeNotifier {
}
}
}
-
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 9269ae6..fc09015 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -399,24 +399,6 @@ public class EntityGraphMapper {
}
}
- private void updateLabels(AtlasVertex vertex, Set<String> labels) {
- if (CollectionUtils.isNotEmpty(labels)) {
- AtlasGraphUtilsV2.setEncodedProperty(vertex, LABELS_PROPERTY_KEY,
getLabelString(labels));
- } else {
- vertex.removeProperty(LABELS_PROPERTY_KEY);
- }
- }
-
- private String getLabelString(Collection<String> labels) {
- String ret = null;
-
- if (!labels.isEmpty()) {
- ret = LABEL_NAME_DELIMITER + String.join(LABEL_NAME_DELIMITER,
labels) + LABEL_NAME_DELIMITER;
- }
-
- return ret;
- }
-
/*
* reset/overwrite business attributes of the entity with given values
*/
@@ -426,6 +408,7 @@ public class EntityGraphMapper {
}
Map<String, Map<String, AtlasBusinessAttribute>>
entityTypeBusinessAttributes = entityType.getBusinessAttributes();
+ Map<String, Map<String, Object>>
updatedBusinessAttributes = new HashMap<>();
for (Map.Entry<String, Map<String, AtlasBusinessAttribute>> entry :
entityTypeBusinessAttributes.entrySet()) {
String bmName =
entry.getKey();
@@ -444,6 +427,8 @@ public class EntityGraphMapper {
}
mapAttribute(bmAttribute, bmAttrNewValue,
entityVertex, CREATE, new EntityMutationContext());
+
+
addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute,
bmAttrNewValue);
}
} else {
if (bmAttrNewValue != null) {
@@ -453,6 +438,8 @@ public class EntityGraphMapper {
}
mapAttribute(bmAttribute, bmAttrNewValue,
entityVertex, UPDATE, new EntityMutationContext());
+
+
addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute,
bmAttrNewValue);
}
} else {
if (LOG.isDebugEnabled()) {
@@ -460,11 +447,17 @@ public class EntityGraphMapper {
}
entityVertex.removeProperty(bmAttribute.getVertexPropertyName());
+
+
addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute,
bmAttrNewValue);
}
}
}
}
+ if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
+
entityChangeNotifier.onBusinessAttributesUpdated(AtlasGraphUtilsV2.getIdFromVertex(entityVertex),
updatedBusinessAttributes);
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("<== setBusinessAttributes(entityVertex={},
entityType={}, businessAttributes={}", entityVertex, entityType.getTypeName(),
businessAttributes);
}
@@ -479,6 +472,7 @@ public class EntityGraphMapper {
}
Map<String, Map<String, AtlasBusinessAttribute>>
entityTypeBusinessAttributes = entityType.getBusinessAttributes();
+ Map<String, Map<String, Object>>
updatedBusinessAttributes = new HashMap<>();
if (MapUtils.isNotEmpty(entityTypeBusinessAttributes) &&
MapUtils.isNotEmpty(businessAttributes)) {
for (Map.Entry<String, Map<String, AtlasBusinessAttribute>> entry
: entityTypeBusinessAttributes.entrySet()) {
@@ -503,16 +497,24 @@ public class EntityGraphMapper {
if (existingValue == null) {
if (bmAttrValue != null) {
mapAttribute(bmAttribute, bmAttrValue,
entityVertex, CREATE, new EntityMutationContext());
+
+
addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute,
bmAttrValue);
}
} else {
if (!Objects.equals(existingValue, bmAttrValue)) {
mapAttribute(bmAttribute, bmAttrValue,
entityVertex, UPDATE, new EntityMutationContext());
+
+
addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute,
bmAttrValue);
}
}
}
}
}
+ if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
+
entityChangeNotifier.onBusinessAttributesUpdated(AtlasGraphUtilsV2.getIdFromVertex(entityVertex),
updatedBusinessAttributes);
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("<== addOrUpdateBusinessAttributes(entityVertex={},
entityType={}, businessAttributes={}", entityVertex, entityType.getTypeName(),
businessAttributes);
}
@@ -521,12 +523,13 @@ public class EntityGraphMapper {
/*
* remove the given business attributes from the entity
*/
- public void removeBusinessAttributes(AtlasVertex entityVertex,
AtlasEntityType entityType, Map<String, Map<String, Object>>
businessAttributes) {
+ public void removeBusinessAttributes(AtlasVertex entityVertex,
AtlasEntityType entityType, Map<String, Map<String, Object>>
businessAttributes) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> removeBusinessAttributes(entityVertex={},
entityType={}, businessAttributes={}", entityVertex, entityType.getTypeName(),
businessAttributes);
}
Map<String, Map<String, AtlasBusinessAttribute>>
entityTypeBusinessAttributes = entityType.getBusinessAttributes();
+ Map<String, Map<String, Object>>
updatedBusinessAttributes = new HashMap<>();
if (MapUtils.isNotEmpty(entityTypeBusinessAttributes) &&
MapUtils.isNotEmpty(businessAttributes)) {
for (Map.Entry<String, Map<String, AtlasBusinessAttribute>> entry
: entityTypeBusinessAttributes.entrySet()) {
@@ -539,16 +542,22 @@ public class EntityGraphMapper {
Map<String, Object> entityBmAttributes =
businessAttributes.get(bmName);
- for (AtlasBusinessAttribute bmttribute :
bmAttributes.values()) {
+ for (AtlasBusinessAttribute bmAttribute :
bmAttributes.values()) {
// if (entityBmAttributes is empty) remove all attributes
in this business-metadata
// else remove the attribute only if its given in
entityBmAttributes
- if (MapUtils.isEmpty(entityBmAttributes) ||
entityBmAttributes.containsKey(bmttribute.getName())) {
-
entityVertex.removeProperty(bmttribute.getVertexPropertyName());
+ if (MapUtils.isEmpty(entityBmAttributes) ||
entityBmAttributes.containsKey(bmAttribute.getName())) {
+
entityVertex.removeProperty(bmAttribute.getVertexPropertyName());
+
+
addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute, null);
}
}
}
}
+ if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
+
entityChangeNotifier.onBusinessAttributesUpdated(AtlasGraphUtilsV2.getIdFromVertex(entityVertex),
updatedBusinessAttributes);
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("<== removeBusinessAttributes(entityVertex={},
entityType={}, businessAttributes={}", entityVertex, entityType.getTypeName(),
businessAttributes);
}
@@ -2559,4 +2568,35 @@ public class EntityGraphMapper {
return propagatedEntities;
}
+
+ private void updateLabels(AtlasVertex vertex, Set<String> labels) {
+ if (CollectionUtils.isNotEmpty(labels)) {
+ AtlasGraphUtilsV2.setEncodedProperty(vertex, LABELS_PROPERTY_KEY,
getLabelString(labels));
+ } else {
+ vertex.removeProperty(LABELS_PROPERTY_KEY);
+ }
+ }
+
+ private String getLabelString(Collection<String> labels) {
+ String ret = null;
+
+ if (!labels.isEmpty()) {
+ ret = LABEL_NAME_DELIMITER + String.join(LABEL_NAME_DELIMITER,
labels) + LABEL_NAME_DELIMITER;
+ }
+
+ return ret;
+ }
+
+ private void addToUpdatedBusinessAttributes(Map<String, Map<String,
Object>> updatedBusinessAttributes, AtlasBusinessAttribute bmAttribute, Object
attrValue) {
+ String bmName =
bmAttribute.getDefinedInType().getTypeName();
+ Map<String, Object> attributes = updatedBusinessAttributes.get(bmName);
+
+ if(attributes == null){
+ attributes = new HashMap<>();
+
+ updatedBusinessAttributes.put(bmName, attributes);
+ }
+
+ attributes.put(bmAttribute.getName(), attrValue);
+ }
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasEntityChangeNotifier.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasEntityChangeNotifier.java
new file mode 100644
index 0000000..f617ef9
--- /dev/null
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasEntityChangeNotifier.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.notification.EntityNotification;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public interface IAtlasEntityChangeNotifier {
+ void onEntitiesMutated(EntityMutationResponse entityMutationResponse,
boolean isImport) throws AtlasBaseException;
+
+ void notifyRelationshipMutation(AtlasRelationship relationship,
EntityNotification.EntityNotificationV2.OperationType operationType) throws
AtlasBaseException;
+
+ void onClassificationAddedToEntity(AtlasEntity entity,
List<AtlasClassification> addedClassifications) throws AtlasBaseException;
+
+ void onClassificationsAddedToEntities(List<AtlasEntity> entities,
List<AtlasClassification> addedClassifications) throws AtlasBaseException;
+
+ void onClassificationDeletedFromEntity(AtlasEntity entity,
List<AtlasClassification> deletedClassifications) throws AtlasBaseException;
+
+ void onClassificationsDeletedFromEntities(List<AtlasEntity> entities,
List<AtlasClassification> deletedClassifications) throws AtlasBaseException;
+
+ void onTermAddedToEntities(AtlasGlossaryTerm term,
List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException;
+
+ void onTermDeletedFromEntities(AtlasGlossaryTerm term,
List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException;
+
+ void onLabelsUpdatedFromEntity(String entityGuid, Set<String> addedLabels,
Set<String> deletedLabels) throws AtlasBaseException;
+
+ void notifyPropagatedEntities() throws AtlasBaseException;
+
+ void onClassificationUpdatedToEntity(AtlasEntity entity,
List<AtlasClassification> updatedClassifications) throws AtlasBaseException;
+
+ void onBusinessAttributesUpdated(String entityGuid, Map<String,
Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException;
+}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/EntityChangeNotifierNop.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/EntityChangeNotifierNop.java
new file mode 100644
index 0000000..05516f2
--- /dev/null
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/EntityChangeNotifierNop.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.bulkimport;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.notification.EntityNotification;
+import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class EntityChangeNotifierNop implements IAtlasEntityChangeNotifier {
+ @Override
+ public void onEntitiesMutated(EntityMutationResponse
entityMutationResponse, boolean isImport) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void notifyRelationshipMutation(AtlasRelationship relationship,
EntityNotification.EntityNotificationV2.OperationType operationType) throws
AtlasBaseException {
+
+ }
+
+ @Override
+ public void onClassificationAddedToEntity(AtlasEntity entity,
List<AtlasClassification> addedClassifications) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onClassificationsAddedToEntities(List<AtlasEntity> entities,
List<AtlasClassification> addedClassifications) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onClassificationDeletedFromEntity(AtlasEntity entity,
List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onClassificationsDeletedFromEntities(List<AtlasEntity>
entities, List<AtlasClassification> deletedClassifications) throws
AtlasBaseException {
+
+ }
+
+ @Override
+ public void onTermAddedToEntities(AtlasGlossaryTerm term,
List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onTermDeletedFromEntities(AtlasGlossaryTerm term,
List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onLabelsUpdatedFromEntity(String entityGuid, Set<String>
addedLabels, Set<String> deletedLabels) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void notifyPropagatedEntities() throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onClassificationUpdatedToEntity(AtlasEntity entity,
List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onBusinessAttributesUpdated(String entityGuid, Map<String,
Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException {
+
+ }
+}
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
index 6d64fec..a677b31 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
@@ -316,4 +316,9 @@ public class EntityNotificationListenerV2 implements
EntityChangeListenerV2 {
public void onRelationshipsPurged(List<AtlasRelationship> relationships)
throws AtlasBaseException {
// do nothing -> notification not sent out for term purged from
entities as its been sent in case of delete
}
+
+ @Override
+ public void onBusinessAttributesUpdated(AtlasEntity entity, Map<String,
Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException{
+ // do nothing -> notification not sent out for business metadata
attribute updation from entities
+ }
}
\ No newline at end of file