Repository: incubator-atlas Updated Branches: refs/heads/master 75bcccd1d -> 511c88670
ATLAS-1463: option to exclude specific entity attributes in audit records Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/511c8867 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/511c8867 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/511c8867 Branch: refs/heads/master Commit: 511c88670d6b0a9b668fec8a1d9513709b4dc2e4 Parents: 75bcccd Author: Sarath Subramanian <[email protected]> Authored: Mon Jan 16 14:15:33 2017 -0800 Committer: Madhan Neethiraj <[email protected]> Committed: Thu Jan 19 15:28:45 2017 -0800 ---------------------------------------------------------------------- .../repository/audit/EntityAuditListener.java | 222 +++++++++++++++++-- .../repository/audit/EntityAuditRepository.java | 14 ++ .../audit/HBaseBasedAuditRepository.java | 61 ++++- .../audit/InMemoryEntityAuditRepository.java | 10 + .../audit/NoopEntityAuditRepository.java | 10 + .../typesystem/persistence/StructInstance.java | 4 + 6 files changed, 300 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/511c8867/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java index 958ecaf..1ef803c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java @@ -21,20 +21,32 @@ package org.apache.atlas.repository.audit; import com.google.inject.Inject; import org.apache.atlas.AtlasException; import org.apache.atlas.EntityAuditEvent; +import org.apache.atlas.EntityAuditEvent.EntityAuditAction; import org.apache.atlas.RequestContext; import org.apache.atlas.listener.EntityChangeListener; +import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.typesystem.json.InstanceSerialization; +import org.apache.atlas.typesystem.types.AttributeInfo; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Listener on entity create/update/delete, tag add/delete. Adds the corresponding audit event to the audit repository. */ public class EntityAuditListener implements EntityChangeListener { + private static final Logger LOG = LoggerFactory.getLogger(EntityAuditListener.class); + private EntityAuditRepository auditRepository; @Inject @@ -46,44 +58,41 @@ public class EntityAuditListener implements EntityChangeListener { public void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException { List<EntityAuditEvent> events = new ArrayList<>(); long currentTime = RequestContext.get().getRequestTime(); + for (ITypedReferenceableInstance entity : entities) { - EntityAuditEvent event = createEvent(entity, currentTime, EntityAuditEvent.EntityAuditAction.ENTITY_CREATE, - "Created: " + InstanceSerialization.toJson(entity, true)); + EntityAuditEvent event = createEvent(entity, currentTime, EntityAuditAction.ENTITY_CREATE); events.add(event); } - auditRepository.putEvents(events); - } - private EntityAuditEvent createEvent(ITypedReferenceableInstance entity, long ts, - EntityAuditEvent.EntityAuditAction action, String details) - throws AtlasException { - return new EntityAuditEvent(entity.getId()._getId(), ts, RequestContext.get().getUser(), action, details, entity); + auditRepository.putEvents(events); } @Override public void onEntitiesUpdated(Collection<ITypedReferenceableInstance> entities) throws AtlasException { List<EntityAuditEvent> events = new ArrayList<>(); long currentTime = RequestContext.get().getRequestTime(); + for (ITypedReferenceableInstance entity : entities) { - EntityAuditEvent event = createEvent(entity, currentTime, EntityAuditEvent.EntityAuditAction.ENTITY_UPDATE, - "Updated: " + InstanceSerialization.toJson(entity, true)); + EntityAuditEvent event = createEvent(entity, currentTime, EntityAuditAction.ENTITY_UPDATE); events.add(event); } + auditRepository.putEvents(events); } @Override public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException { - EntityAuditEvent event = createEvent(entity, RequestContext.get().getRequestTime(), - EntityAuditEvent.EntityAuditAction.TAG_ADD, - "Added trait: " + InstanceSerialization.toJson(trait, true)); + EntityAuditEvent event = createEvent(entity, RequestContext.get().getRequestTime(), EntityAuditAction.TAG_ADD, + "Added trait: " + InstanceSerialization.toJson(trait, true)); + auditRepository.putEvents(event); } @Override public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException { - EntityAuditEvent event = createEvent(entity, RequestContext.get().getRequestTime(), - EntityAuditEvent.EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName); + EntityAuditEvent event = createEvent(entity, RequestContext.get().getRequestTime(), EntityAuditAction.TAG_DELETE, + "Deleted trait: " + traitName); + auditRepository.putEvents(event); } @@ -91,11 +100,190 @@ public class EntityAuditListener implements EntityChangeListener { public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities) throws AtlasException { List<EntityAuditEvent> events = new ArrayList<>(); long currentTime = RequestContext.get().getRequestTime(); + for (ITypedReferenceableInstance entity : entities) { - EntityAuditEvent event = createEvent(entity, currentTime, - EntityAuditEvent.EntityAuditAction.ENTITY_DELETE, "Deleted entity"); + EntityAuditEvent event = createEvent(entity, currentTime, EntityAuditAction.ENTITY_DELETE, "Deleted entity"); events.add(event); } + auditRepository.putEvents(events); } + + private EntityAuditEvent createEvent(ITypedReferenceableInstance entity, long ts, EntityAuditAction action) + throws AtlasException { + String detail = getAuditEventDetail(entity, action); + + return createEvent(entity, ts, action, detail); + } + + private EntityAuditEvent createEvent(ITypedReferenceableInstance entity, long ts, EntityAuditAction action, String details) + throws AtlasException { + return new EntityAuditEvent(entity.getId()._getId(), ts, RequestContext.get().getUser(), action, details, entity); + } + + private String getAuditEventDetail(ITypedReferenceableInstance entity, EntityAuditAction action) throws AtlasException { + Map<String, Object> prunedAttributes = pruneEntityAttributesForAudit(entity); + + String auditPrefix = getAuditPrefix(action); + String auditString = auditPrefix + InstanceSerialization.toJson(entity, true); + byte[] auditBytes = auditString.getBytes(StandardCharsets.UTF_8); + long auditSize = auditBytes != null ? auditBytes.length : 0; + long auditMaxSize = auditRepository.repositoryMaxSize(); + + if (auditMaxSize >= 0 && auditSize > auditMaxSize) { // don't store attributes in audit + LOG.warn("audit record too long: entityType={}, guid={}, size={}; maxSize={}. entity attribute values not stored in audit", + entity.getTypeName(), entity.getId()._getId(), auditSize, auditMaxSize); + + Map<String, Object> attrValues = entity.getValuesMap(); + + clearAttributeValues(entity); + + auditString = auditPrefix + InstanceSerialization.toJson(entity, true); + + addAttributeValues(entity, attrValues); + } + + restoreEntityAttributes(entity, prunedAttributes); + + return auditString; + } + + private void clearAttributeValues(IReferenceableInstance entity) throws AtlasException { + Map<String, Object> attributesMap = entity.getValuesMap(); + + if (MapUtils.isNotEmpty(attributesMap)) { + for (String attribute : attributesMap.keySet()) { + entity.setNull(attribute); + } + } + } + + private void addAttributeValues(ITypedReferenceableInstance entity, Map<String, Object> attributesMap) throws AtlasException { + if (MapUtils.isNotEmpty(attributesMap)) { + for (String attr : attributesMap.keySet()) { + entity.set(attr, attributesMap.get(attr)); + } + } + } + + private Map<String, Object> pruneEntityAttributesForAudit(ITypedReferenceableInstance entity) throws AtlasException { + Map<String, Object> ret = null; + Map<String, Object> entityAttributes = entity.getValuesMap(); + List<String> excludeAttributes = auditRepository.getAuditExcludeAttributes(entity.getTypeName()); + + if (CollectionUtils.isNotEmpty(excludeAttributes) && MapUtils.isNotEmpty(entityAttributes)) { + Map<String, AttributeInfo> attributeInfoMap = entity.fieldMapping().fields; + + for (String attrName : entityAttributes.keySet()) { + Object attrValue = entityAttributes.get(attrName); + AttributeInfo attrInfo = attributeInfoMap.get(attrName); + + if (excludeAttributes.contains(attrName)) { + if (ret == null) { + ret = new HashMap<>(); + } + + ret.put(attrName, attrValue); + entity.setNull(attrName); + } else if (attrInfo.isComposite) { + if (attrValue instanceof Collection) { + for (Object attribute : (Collection) attrValue) { + if (attribute instanceof ITypedReferenceableInstance) { + ITypedReferenceableInstance attrInstance = (ITypedReferenceableInstance) attribute; + Map<String, Object> prunedAttrs = pruneEntityAttributesForAudit(attrInstance); + + if (MapUtils.isNotEmpty(prunedAttrs)) { + if (ret == null) { + ret = new HashMap<>(); + } + + ret.put(attrInstance.getId()._getId(), prunedAttrs); + } + } + } + } else if (attrValue instanceof ITypedReferenceableInstance) { + ITypedReferenceableInstance attrInstance = (ITypedReferenceableInstance) attrValue; + Map<String, Object> prunedAttrs = pruneEntityAttributesForAudit(attrInstance); + + if (MapUtils.isNotEmpty(prunedAttrs)) { + if (ret == null) { + ret = new HashMap<>(); + } + + ret.put(attrInstance.getId()._getId(), prunedAttrs); + } + } + } + } + } + + return ret; + } + + private void restoreEntityAttributes(ITypedReferenceableInstance entity, Map<String, Object> prunedAttributes) throws AtlasException { + if (MapUtils.isEmpty(prunedAttributes)) { + return; + } + + Map<String, Object> entityAttributes = entity.getValuesMap(); + + if (MapUtils.isNotEmpty(entityAttributes)) { + Map<String, AttributeInfo> attributeInfoMap = entity.fieldMapping().fields; + + for (String attrName : entityAttributes.keySet()) { + Object attrValue = entityAttributes.get(attrName); + AttributeInfo attrInfo = attributeInfoMap.get(attrName); + + if (prunedAttributes.containsKey(attrName)) { + entity.set(attrName, prunedAttributes.get(attrName)); + } else if (attrInfo.isComposite) { + if (attrValue instanceof Collection) { + for (Object attributeEntity : (Collection) attrValue) { + if (attributeEntity instanceof ITypedReferenceableInstance) { + ITypedReferenceableInstance attrInstance = (ITypedReferenceableInstance) attributeEntity; + Object obj = prunedAttributes.get(attrInstance.getId()._getId()); + + if (obj instanceof Map) { + restoreEntityAttributes(attrInstance, (Map) obj); + } + } + } + } else if (attrValue instanceof ITypedReferenceableInstance) { + ITypedReferenceableInstance attrInstance = (ITypedReferenceableInstance) attrValue; + Object obj = prunedAttributes.get(attrInstance.getId()._getId()); + + if (obj instanceof Map) { + restoreEntityAttributes(attrInstance, (Map) obj); + } + } + } + } + } + } + + private String getAuditPrefix(EntityAuditAction action) { + final String ret; + + switch (action) { + case ENTITY_CREATE: + ret = "Created: "; + break; + case ENTITY_UPDATE: + ret = "Updated: "; + break; + case ENTITY_DELETE: + ret = "Deleted: "; + break; + case TAG_ADD: + ret = "Added trait: "; + break; + case TAG_DELETE: + ret = "Deleted trait: "; + break; + default: + ret = "Unknown: "; + } + + return ret; + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/511c8867/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java index 417092a..9dc7835 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java @@ -50,4 +50,18 @@ public interface EntityAuditRepository { * @throws AtlasException */ List<EntityAuditEvent> listEvents(String entityId, String startKey, short n) throws AtlasException; + + /** + * Returns maximum allowed repository size per EntityAuditEvent + * @throws AtlasException + */ + long repositoryMaxSize() throws AtlasException; + + /** + * list of attributes to be excluded when storing in audit repo. + * @param entityType type of entity + * @return list of attribute names to be excluded + * @throws AtlasException + */ + List<String> getAuditExcludeAttributes(String entityType) throws AtlasException; } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/511c8867/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java index 5099521..fb05e5a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java @@ -52,8 +52,10 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; /** * HBase based repository for entity audit events @@ -74,9 +76,6 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository public static final String CONFIG_PREFIX = "atlas.audit"; public static final String CONFIG_TABLE_NAME = CONFIG_PREFIX + ".hbase.tablename"; public static final String DEFAULT_TABLE_NAME = "ATLAS_ENTITY_AUDIT_EVENTS"; - - private static final String FIELD_SEPARATOR = ":"; - public static final String CONFIG_PERSIST_ENTITY_DEFINITION = CONFIG_PREFIX + ".persistEntityDefinition"; public static final byte[] COLUMN_FAMILY = Bytes.toBytes("dt"); @@ -85,7 +84,15 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository public static final byte[] COLUMN_USER = Bytes.toBytes("u"); public static final byte[] COLUMN_DEFINITION = Bytes.toBytes("f"); - private static boolean persistEntityDefinition; + private static final String AUDIT_REPOSITORY_MAX_SIZE_PROPERTY = "atlas.hbase.client.keyvalue.maxsize"; + private static final String AUDIT_EXCLUDE_ATTRIBUTE_PROPERTY = "atlas.audit.hbase.entity"; + private static final String FIELD_SEPARATOR = ":"; + private static final long ATLAS_HBASE_KEYVALUE_DEFAULT_SIZE = 1024 * 1024; + private static Configuration APPLICATION_PROPERTIES = null; + + private static boolean persistEntityDefinition; + + private Map<String, List<String>> auditExcludedAttributesCache = new HashMap<>(); static { try { @@ -219,6 +226,52 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository } } + @Override + public long repositoryMaxSize() throws AtlasException { + long ret; + initApplicationProperties(); + + if (APPLICATION_PROPERTIES == null) { + ret = ATLAS_HBASE_KEYVALUE_DEFAULT_SIZE; + } else { + ret = APPLICATION_PROPERTIES.getLong(AUDIT_REPOSITORY_MAX_SIZE_PROPERTY, ATLAS_HBASE_KEYVALUE_DEFAULT_SIZE); + } + + return ret; + } + + @Override + public List<String> getAuditExcludeAttributes(String entityType) throws AtlasException { + List<String> ret = null; + + initApplicationProperties(); + + if (auditExcludedAttributesCache.containsKey(entityType)) { + ret = auditExcludedAttributesCache.get(entityType); + } else if (APPLICATION_PROPERTIES != null) { + String[] excludeAttributes = APPLICATION_PROPERTIES.getStringArray(AUDIT_EXCLUDE_ATTRIBUTE_PROPERTY + "." + + entityType + "." + "attributes.exclude"); + + if (excludeAttributes != null) { + ret = Arrays.asList(excludeAttributes); + } + + auditExcludedAttributesCache.put(entityType, ret); + } + + return ret; + } + + private void initApplicationProperties() { + if (APPLICATION_PROPERTIES == null) { + try { + APPLICATION_PROPERTIES = ApplicationProperties.get(); + } catch (AtlasException ex) { + // ignore + } + } + } + private String getResultString(Result result, byte[] columnName) { byte[] rawValue = result.getValue(COLUMN_FAMILY, columnName); if ( rawValue != null) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/511c8867/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java index cf76596..50a007b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java @@ -66,4 +66,14 @@ public class InMemoryEntityAuditRepository implements EntityAuditRepository { } return events; } + + @Override + public long repositoryMaxSize() throws AtlasException { + return -1; + } + + @Override + public List<String> getAuditExcludeAttributes(String entityType) throws AtlasException { + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/511c8867/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java index 9f77bfe..d4d3c20 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java @@ -47,4 +47,14 @@ public class NoopEntityAuditRepository implements EntityAuditRepository { throws AtlasException { return Collections.emptyList(); } + + @Override + public long repositoryMaxSize() throws AtlasException { + return -1; + } + + @Override + public List<String> getAuditExcludeAttributes(String entityType) throws AtlasException { + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/511c8867/typesystem/src/main/java/org/apache/atlas/typesystem/persistence/StructInstance.java ---------------------------------------------------------------------- diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/persistence/StructInstance.java b/typesystem/src/main/java/org/apache/atlas/typesystem/persistence/StructInstance.java index 9a9beff..fc10d07 100755 --- a/typesystem/src/main/java/org/apache/atlas/typesystem/persistence/StructInstance.java +++ b/typesystem/src/main/java/org/apache/atlas/typesystem/persistence/StructInstance.java @@ -254,6 +254,10 @@ public class StructInstance implements ITypedStruct { bigDecimals[pos] = null; } else if (i.dataType() == DataTypes.DATE_TYPE) { dates[pos] = null; + } else if (i.dataType() == DataTypes.INT_TYPE) { + ints[pos] = 0; + } else if (i.dataType() == DataTypes.BOOLEAN_TYPE) { + bools[pos] = false; } else if (i.dataType() == DataTypes.STRING_TYPE) { strings[pos] = null; } else if (i.dataType().getTypeCategory() == DataTypes.TypeCategory.ARRAY) {
