ATLAS-2712: Update v2 Audit API to handle v1 to v2 conversion Signed-off-by: Madhan Neethiraj <mad...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/d343a486 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/d343a486 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/d343a486 Branch: refs/heads/master Commit: d343a4860df49e767eeefd46ab5fa2b147cc5cd3 Parents: 47ec9f7 Author: Sarath Subramanian <ssubraman...@hortonworks.com> Authored: Tue May 22 18:38:33 2018 -0700 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Tue May 22 23:05:07 2018 -0700 ---------------------------------------------------------------------- .../atlas/model/audit/EntityAuditEventV2.java | 51 +++++-- .../AbstractStorageBasedAuditRepository.java | 19 ++- .../audit/CassandraBasedAuditRepository.java | 4 +- .../repository/audit/EntityAuditListener.java | 42 +++++- .../repository/audit/EntityAuditListenerV2.java | 76 +++++++--- .../audit/HBaseBasedAuditRepository.java | 151 ++++++++++++++++--- .../converters/AtlasInstanceConverter.java | 28 ++-- .../graph/v1/AtlasEntityChangeNotifier.java | 8 +- .../audit/AuditRepositoryTestBase.java | 8 +- 9 files changed, 300 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/d343a486/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java index d14f6ae..787f5a9 100644 --- a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java +++ b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java @@ -32,6 +32,7 @@ import java.util.Objects; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditType.ENTITY_AUDIT_V2; /** * Structure of v2 entity audit event @@ -42,13 +43,15 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ @XmlRootElement @XmlAccessorType(XmlAccessType.PROPERTY) public class EntityAuditEventV2 implements Serializable { - public enum EntityAuditAction { + public enum EntityAuditType { ENTITY_AUDIT_V1, ENTITY_AUDIT_V2 } + + public enum EntityAuditActionV2 { ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE, CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE, PROPAGATED_CLASSIFICATION_ADD, PROPAGATED_CLASSIFICATION_DELETE, PROPAGATED_CLASSIFICATION_UPDATE; - public static EntityAuditAction fromString(String strValue) { + public static EntityAuditActionV2 fromString(String strValue) { switch (strValue) { case "ENTITY_CREATE": return ENTITY_CREATE; @@ -79,28 +82,35 @@ public class EntityAuditEventV2 implements Serializable { return PROPAGATED_CLASSIFICATION_UPDATE; } - throw new IllegalArgumentException("No enum constant " + EntityAuditAction.class.getCanonicalName() + "." + strValue); + throw new IllegalArgumentException("No enum constant " + EntityAuditActionV2.class.getCanonicalName() + "." + strValue); } } - private String entityId; - private long timestamp; - private String user; - private EntityAuditAction action; - private String details; - private String eventKey; - private AtlasEntity entity; + private String entityId; + private long timestamp; + private String user; + private EntityAuditActionV2 action; + private String details; + private String eventKey; + private AtlasEntity entity; + private EntityAuditType type; public EntityAuditEventV2() { } - public EntityAuditEventV2(String entityId, long timestamp, String user, EntityAuditAction action, String details, + public EntityAuditEventV2(String entityId, long timestamp, String user, EntityAuditActionV2 action, String details, AtlasEntity entity) { + this(entityId, timestamp, user, action, details, entity, ENTITY_AUDIT_V2); + } + + public EntityAuditEventV2(String entityId, long timestamp, String user, EntityAuditActionV2 action, String details, + AtlasEntity entity, EntityAuditType auditType) { setEntityId(entityId); setTimestamp(timestamp); setUser(user); setAction(action); setDetails(details); setEntity(entity); + setType(auditType); } public String getEntityId() { @@ -127,11 +137,11 @@ public class EntityAuditEventV2 implements Serializable { this.user = user; } - public EntityAuditAction getAction() { + public EntityAuditActionV2 getAction() { return action; } - public void setAction(EntityAuditAction action) { + public void setAction(EntityAuditActionV2 action) { this.action = action; } @@ -159,6 +169,14 @@ public class EntityAuditEventV2 implements Serializable { this.entity = entity; } + public EntityAuditType getType() { + return type; + } + + public void setType(EntityAuditType type) { + this.type = type; + } + @JsonIgnore public String getEntityDefinitionString() { if (entity != null) { @@ -185,17 +203,19 @@ public class EntityAuditEventV2 implements Serializable { action == that.action && Objects.equals(details, that.details) && Objects.equals(eventKey, that.eventKey) && - Objects.equals(entity, that.entity); + Objects.equals(entity, that.entity) && + Objects.equals(type, that.type); } @Override public int hashCode() { - return Objects.hash(entityId, timestamp, user, action, details, eventKey, entity); + return Objects.hash(entityId, timestamp, user, action, details, eventKey, entity, type); } @Override public String toString() { final StringBuilder sb = new StringBuilder("EntityAuditEventV2{"); + sb.append("entityId='").append(entityId).append('\''); sb.append(", timestamp=").append(timestamp); sb.append(", user='").append(user).append('\''); @@ -203,6 +223,7 @@ public class EntityAuditEventV2 implements Serializable { sb.append(", details='").append(details).append('\''); sb.append(", eventKey='").append(eventKey).append('\''); sb.append(", entity=").append(entity); + sb.append(", type=").append(type); sb.append('}'); return sb.toString(); http://git-wip-us.apache.org/repos/asf/atlas/blob/d343a486/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java index 39b1ef2..f6b741b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java @@ -42,17 +42,16 @@ import java.util.Map; public abstract class AbstractStorageBasedAuditRepository implements Service, EntityAuditRepository, ActiveStateChangeHandler { private static final Logger LOG = LoggerFactory.getLogger(HBaseBasedAuditRepository.class); - private static final String AUDIT_REPOSITORY_MAX_SIZE_PROPERTY = "atlas.hbase.client.keyvalue.maxsize"; - private static final String AUDIT_EXCLUDE_ATTRIBUTE_PROPERTY = "atlas.audit.hbase.entity"; - protected static final String FIELD_SEPARATOR = ":"; - private static final long ATLAS_HBASE_KEYVALUE_DEFAULT_SIZE = 1024 * 1024; - protected static Configuration APPLICATION_PROPERTIES = null; - public static final String CONFIG_PREFIX = "atlas.audit"; - public static final String CONFIG_PERSIST_ENTITY_DEFINITION = CONFIG_PREFIX + ".persistEntityDefinition"; - + private static final String AUDIT_REPOSITORY_MAX_SIZE_PROPERTY = "atlas.hbase.client.keyvalue.maxsize"; + private static final String AUDIT_EXCLUDE_ATTRIBUTE_PROPERTY = "atlas.audit.hbase.entity"; + private static final long ATLAS_HBASE_KEYVALUE_DEFAULT_SIZE = 1024 * 1024; + public static final String CONFIG_PREFIX = "atlas.audit"; + public static final String CONFIG_PERSIST_ENTITY_DEFINITION = CONFIG_PREFIX + ".persistEntityDefinition"; + protected static final String FIELD_SEPARATOR = ":"; + + protected static Configuration APPLICATION_PROPERTIES = null; protected Map<String, List<String>> auditExcludedAttributesCache = new HashMap<>(); - - protected static boolean persistEntityDefinition; + protected static boolean persistEntityDefinition; static { try { http://git-wip-us.apache.org/repos/asf/atlas/blob/d343a486/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java index 9d6aaae..8b185a3 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java @@ -25,13 +25,11 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.google.common.annotations.VisibleForTesting; -import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.annotation.ConditionalOnAtlasProperty; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.audit.EntityAuditEventV2; -import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -176,7 +174,7 @@ public class CassandraBasedAuditRepository extends AbstractStorageBasedAuditRepo } EntityAuditEventV2 event = new EntityAuditEventV2(); event.setEntityId(rowEntityId); - event.setAction(EntityAuditEventV2.EntityAuditAction.fromString(row.getString(ACTION))); + event.setAction(EntityAuditEventV2.EntityAuditActionV2.fromString(row.getString(ACTION))); event.setDetails(row.getString(DETAIL)); event.setUser(row.getString(USER)); event.setTimestamp(row.getLong(CREATED)); http://git-wip-us.apache.org/repos/asf/atlas/blob/d343a486/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java index 6e868e6..a085e8e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java @@ -146,7 +146,7 @@ public class EntityAuditListener implements EntityChangeListener { private String getAuditEventDetail(Referenceable entity, EntityAuditAction action) throws AtlasException { Map<String, Object> prunedAttributes = pruneEntityAttributesForAudit(entity); - String auditPrefix = getAuditPrefix(action); + String auditPrefix = getV1AuditPrefix(action); String auditString = auditPrefix + AtlasType.toV1Json(entity); byte[] auditBytes = auditString.getBytes(StandardCharsets.UTF_8); long auditSize = auditBytes != null ? auditBytes.length : 0; @@ -259,7 +259,7 @@ public class EntityAuditListener implements EntityChangeListener { } } - private String getAuditPrefix(EntityAuditAction action) { + public static String getV1AuditPrefix(EntityAuditAction action) { final String ret; switch (action) { @@ -296,4 +296,42 @@ public class EntityAuditListener implements EntityChangeListener { return ret; } + + public static String getV2AuditPrefix(EntityAuditAction action) { + final String ret; + + switch (action) { + case ENTITY_CREATE: + ret = "Created: "; + break; + case ENTITY_UPDATE: + ret = "Updated: "; + break; + case ENTITY_DELETE: + ret = "Deleted: "; + break; + case TAG_ADD: + ret = "Added classification: "; + break; + case TAG_DELETE: + ret = "Deleted classification: "; + break; + case TAG_UPDATE: + ret = "Updated classification: "; + break; + case ENTITY_IMPORT_CREATE: + ret = "Created by import: "; + break; + case ENTITY_IMPORT_UPDATE: + ret = "Updated by import: "; + break; + case ENTITY_IMPORT_DELETE: + ret = "Deleted by import: "; + break; + default: + ret = "Unknown: "; + } + + return ret; + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/d343a486/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java index 4fd2fd9..970b14a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java @@ -17,9 +17,11 @@ */ package org.apache.atlas.repository.audit; +import org.apache.atlas.EntityAuditEvent; +import org.apache.atlas.EntityAuditEvent.EntityAuditAction; import org.apache.atlas.RequestContextV1; import org.apache.atlas.model.audit.EntityAuditEventV2; -import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction; +import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.listener.EntityChangeListenerV2; import org.apache.atlas.model.instance.AtlasClassification; @@ -42,18 +44,18 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.CLASSIFICATION_ADD; -import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.CLASSIFICATION_DELETE; -import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.CLASSIFICATION_UPDATE; -import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_CREATE; -import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_DELETE; -import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_IMPORT_CREATE; -import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_IMPORT_DELETE; -import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_IMPORT_UPDATE; -import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_UPDATE; -import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_ADD; -import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_DELETE; -import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_UPDATE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_ADD; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_DELETE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_UPDATE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_DELETE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_CREATE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_DELETE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_UPDATE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_UPDATE; @Component public class EntityAuditListenerV2 implements EntityChangeListenerV2 { @@ -165,21 +167,21 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 { } } - private EntityAuditEventV2 createEvent(AtlasEntity entity, EntityAuditAction action, String details) { + private EntityAuditEventV2 createEvent(AtlasEntity entity, EntityAuditActionV2 action, String details) { return new EntityAuditEventV2(entity.getGuid(), RequestContextV1.get().getRequestTime(), RequestContextV1.get().getUser(), action, details, entity); } - private EntityAuditEventV2 createEvent(AtlasEntity entity, EntityAuditAction action) { + private EntityAuditEventV2 createEvent(AtlasEntity entity, EntityAuditActionV2 action) { String detail = getAuditEventDetail(entity, action); return createEvent(entity, action, detail); } - private String getAuditEventDetail(AtlasEntity entity, EntityAuditAction action) { + private String getAuditEventDetail(AtlasEntity entity, EntityAuditActionV2 action) { Map<String, Object> prunedAttributes = pruneEntityAttributesForAudit(entity); - String auditPrefix = getAuditPrefix(action); + String auditPrefix = getV2AuditPrefix(action); String auditString = auditPrefix + AtlasType.toJson(entity); byte[] auditBytes = auditString.getBytes(StandardCharsets.UTF_8); long auditSize = auditBytes != null ? auditBytes.length : 0; @@ -277,7 +279,45 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 { } } - private String getAuditPrefix(EntityAuditAction action) { + private String getV1AuditPrefix(EntityAuditAction action) { + final String ret; + + switch (action) { + case ENTITY_CREATE: + ret = "Created: "; + break; + case ENTITY_UPDATE: + ret = "Updated: "; + break; + case ENTITY_DELETE: + ret = "Deleted: "; + break; + case TAG_ADD: + ret = "Added classification: "; + break; + case TAG_DELETE: + ret = "Deleted classification: "; + break; + case TAG_UPDATE: + ret = "Updated classification: "; + break; + case ENTITY_IMPORT_CREATE: + ret = "Created by import: "; + break; + case ENTITY_IMPORT_UPDATE: + ret = "Updated by import: "; + break; + case ENTITY_IMPORT_DELETE: + ret = "Deleted by import: "; + break; + default: + ret = "Unknown: "; + } + + return ret; + } + + private String getV2AuditPrefix(EntityAuditActionV2 action) { final String ret; switch (action) { http://git-wip-us.apache.org/repos/asf/atlas/blob/d343a486/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java index e55864b..3656b85 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java @@ -22,12 +22,21 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.EntityAuditEvent; +import org.apache.atlas.EntityAuditEvent.EntityAuditAction; import org.apache.atlas.annotation.ConditionalOnAtlasProperty; import org.apache.atlas.model.audit.EntityAuditEventV2; -import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction; +import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.repository.converters.AtlasInstanceConverter; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -50,6 +59,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import javax.inject.Inject; import javax.inject.Singleton; import java.io.Closeable; import java.io.IOException; @@ -57,6 +67,14 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_ADD; +import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_DELETE; +import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_UPDATE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditType; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditType.ENTITY_AUDIT_V1; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditType.ENTITY_AUDIT_V2; +import static org.apache.atlas.repository.audit.EntityAuditListener.getV2AuditPrefix; + /** * HBase based repository for entity audit events * <p> @@ -77,17 +95,23 @@ import java.util.List; public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditRepository { private static final Logger LOG = LoggerFactory.getLogger(HBaseBasedAuditRepository.class); - public static final String CONFIG_TABLE_NAME = CONFIG_PREFIX + ".hbase.tablename"; + public static final String CONFIG_TABLE_NAME = CONFIG_PREFIX + ".hbase.tablename"; public static final String DEFAULT_TABLE_NAME = "ATLAS_ENTITY_AUDIT_EVENTS"; - - public static final byte[] COLUMN_FAMILY = Bytes.toBytes("dt"); - public static final byte[] COLUMN_ACTION = Bytes.toBytes("a"); - public static final byte[] COLUMN_DETAIL = Bytes.toBytes("d"); - public static final byte[] COLUMN_USER = Bytes.toBytes("u"); - public static final byte[] COLUMN_DEFINITION = Bytes.toBytes("f"); + public static final byte[] COLUMN_FAMILY = Bytes.toBytes("dt"); + public static final byte[] COLUMN_ACTION = Bytes.toBytes("a"); + public static final byte[] COLUMN_DETAIL = Bytes.toBytes("d"); + public static final byte[] COLUMN_USER = Bytes.toBytes("u"); + public static final byte[] COLUMN_DEFINITION = Bytes.toBytes("f"); + public static final byte[] COLUMN_TYPE = Bytes.toBytes("t"); private TableName tableName; private Connection connection; + private final AtlasInstanceConverter instanceConverter; + + @Inject + public HBaseBasedAuditRepository(AtlasInstanceConverter instanceConverter) { + this.instanceConverter = instanceConverter; + } /** * Add events to the event repository @@ -101,23 +125,32 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito } Table table = null; + try { - table = connection.getTable(tableName); + table = connection.getTable(tableName); List<Put> puts = new ArrayList<>(events.size()); for (int index = 0; index < events.size(); index++) { EntityAuditEvent event = events.get(index); - LOG.debug("Adding entity audit event {}", event); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding entity audit event {}", event); + } + Put put = new Put(getKey(event.getEntityId(), event.getTimestamp(), index)); + addColumn(put, COLUMN_ACTION, event.getAction()); addColumn(put, COLUMN_USER, event.getUser()); addColumn(put, COLUMN_DETAIL, event.getDetails()); + addColumn(put, COLUMN_TYPE, ENTITY_AUDIT_V1); + if (persistEntityDefinition) { addColumn(put, COLUMN_DEFINITION, event.getEntityDefinitionString()); } + puts.add(put); } + table.put(puts); } catch (IOException e) { throw new AtlasException(e); @@ -150,6 +183,7 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito addColumn(put, COLUMN_ACTION, event.getAction()); addColumn(put, COLUMN_USER, event.getUser()); addColumn(put, COLUMN_DETAIL, event.getDetails()); + addColumn(put, COLUMN_TYPE, ENTITY_AUDIT_V2); if (persistEntityDefinition) { addColumn(put, COLUMN_DEFINITION, event.getEntity()); @@ -216,16 +250,14 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito if (!event.getEntityId().equals(entityId)) { continue; } + event.setUser(getResultString(result, COLUMN_USER)); - event.setAction(EntityAuditAction.fromString(getResultString(result, COLUMN_ACTION))); - event.setDetails(getResultString(result, COLUMN_DETAIL)); + event.setAction(EntityAuditActionV2.fromString(getResultString(result, COLUMN_ACTION))); + event.setDetails(getEntityDetails(result)); + event.setType(getAuditType(result)); if (persistEntityDefinition) { - String colDef = getResultString(result, COLUMN_DEFINITION); - - if (colDef != null) { - event.setEntityDefinition(colDef); - } + event.setEntityDefinition(getEntityDefinition(result)); } events.add(event); @@ -248,6 +280,91 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito } } + private String getEntityDefinition(Result result) throws AtlasBaseException { + String ret = getResultString(result, COLUMN_DEFINITION); + + if (getAuditType(result) != ENTITY_AUDIT_V2) { + Referenceable referenceable = AtlasType.fromV1Json(ret, Referenceable.class); + AtlasEntity entity = toAtlasEntity(referenceable); + + ret = AtlasType.toJson(entity); + } + + return ret; + } + + private String getEntityDetails(Result result) throws AtlasBaseException { + String ret; + + if (getAuditType(result) == ENTITY_AUDIT_V2) { + ret = getResultString(result, COLUMN_DETAIL); + } else { + // convert v1 audit detail to v2 + ret = getV2Details(result); + } + + return ret; + } + + private EntityAuditType getAuditType(Result result) { + String typeString = getResultString(result, COLUMN_TYPE); + EntityAuditType ret = (typeString != null) ? EntityAuditType.valueOf(typeString) : ENTITY_AUDIT_V1; + + return ret; + } + + private String getV2Details(Result result) throws AtlasBaseException { + String ret = null; + String v1DetailsWithPrefix = getResultString(result, COLUMN_DETAIL); + + if (StringUtils.isNotEmpty(v1DetailsWithPrefix)) { + EntityAuditAction v1AuditAction = EntityAuditAction.fromString(getResultString(result, COLUMN_ACTION)); + String v1AuditPrefix = EntityAuditListener.getV1AuditPrefix(v1AuditAction); + String[] split = v1DetailsWithPrefix.split(v1AuditPrefix); + + if (ArrayUtils.isNotEmpty(split) && split.length == 2) { + String v1AuditDetails = split[1]; + Referenceable referenceable = AtlasType.fromV1Json(v1AuditDetails, Referenceable.class); + String v2Json = (referenceable != null) ? toV2Json(referenceable, v1AuditAction) : v1AuditDetails; + + if (v2Json != null) { + ret = getV2AuditPrefix(v1AuditAction) + v2Json; + } + } else { + ret = v1DetailsWithPrefix; + } + } + + return ret; + } + + private String toV2Json(Referenceable referenceable, EntityAuditAction action) throws AtlasBaseException { + String ret; + + if (action == TAG_ADD || action == TAG_UPDATE || action == TAG_DELETE) { + AtlasClassification classification = instanceConverter.toAtlasClassification(referenceable); + + ret = AtlasType.toJson(classification); + } else { + AtlasEntity entity = toAtlasEntity(referenceable); + + ret = AtlasType.toJson(entity); + } + + return ret; + } + + private AtlasEntity toAtlasEntity(Referenceable referenceable) throws AtlasBaseException { + AtlasEntity ret = null; + AtlasEntitiesWithExtInfo entitiesWithExtInfo = instanceConverter.toAtlasEntity(referenceable); + + if (entitiesWithExtInfo != null && CollectionUtils.isNotEmpty(entitiesWithExtInfo.getEntities())) { + ret = entitiesWithExtInfo.getEntities().get(0); + } + + return ret; + } + private <T> void addColumn(Put put, byte[] columnName, T columnValue) { if (columnValue != null && !columnValue.toString().isEmpty()) { put.addColumn(COLUMN_FAMILY, columnName, Bytes.toBytes(columnValue.toString())); http://git-wip-us.apache.org/repos/asf/atlas/blob/d343a486/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java index 7eda7e2..7d7e780 100644 --- a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java +++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java @@ -348,7 +348,7 @@ public class AtlasInstanceConverter { return ret; } - private EntityAuditEvent.EntityAuditAction getV1AuditAction(EntityAuditEventV2.EntityAuditAction v2AuditAction) { + private EntityAuditEvent.EntityAuditAction getV1AuditAction(EntityAuditEventV2.EntityAuditActionV2 v2AuditAction) { switch (v2AuditAction) { case ENTITY_CREATE: return EntityAuditEvent.EntityAuditAction.ENTITY_CREATE; @@ -379,32 +379,32 @@ public class AtlasInstanceConverter { return null; } - private EntityAuditEventV2.EntityAuditAction getV2AuditAction(EntityAuditEvent.EntityAuditAction v1AuditAction) { + private EntityAuditEventV2.EntityAuditActionV2 getV2AuditAction(EntityAuditEvent.EntityAuditAction v1AuditAction) { switch (v1AuditAction) { case ENTITY_CREATE: - return EntityAuditEventV2.EntityAuditAction.ENTITY_CREATE; + return EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE; case ENTITY_UPDATE: - return EntityAuditEventV2.EntityAuditAction.ENTITY_UPDATE; + return EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE; case ENTITY_DELETE: - return EntityAuditEventV2.EntityAuditAction.ENTITY_DELETE; + return EntityAuditEventV2.EntityAuditActionV2.ENTITY_DELETE; case ENTITY_IMPORT_CREATE: - return EntityAuditEventV2.EntityAuditAction.ENTITY_IMPORT_CREATE; + return EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_CREATE; case ENTITY_IMPORT_UPDATE: - return EntityAuditEventV2.EntityAuditAction.ENTITY_IMPORT_UPDATE; + return EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_UPDATE; case ENTITY_IMPORT_DELETE: - return EntityAuditEventV2.EntityAuditAction.ENTITY_IMPORT_DELETE; + return EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_DELETE; case TAG_ADD: - return EntityAuditEventV2.EntityAuditAction.CLASSIFICATION_ADD; + return EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_ADD; case TAG_DELETE: - return EntityAuditEventV2.EntityAuditAction.CLASSIFICATION_DELETE; + return EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_DELETE; case TAG_UPDATE: - return EntityAuditEventV2.EntityAuditAction.CLASSIFICATION_UPDATE; + return EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_UPDATE; case PROPAGATED_TAG_ADD: - return EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_ADD; + return EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD; case PROPAGATED_TAG_DELETE: - return EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_DELETE; + return EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE; case PROPAGATED_TAG_UPDATE: - return EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_UPDATE; + return EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_UPDATE; } return null; http://git-wip-us.apache.org/repos/asf/atlas/blob/d343a486/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java index 46b17c0..083600e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java @@ -24,7 +24,7 @@ import org.apache.atlas.RequestContextV1; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.listener.EntityChangeListenerV2; -import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction; +import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; @@ -57,8 +57,8 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_ADD; -import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.PROPAGATED_CLASSIFICATION_DELETE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE; import static org.apache.atlas.util.AtlasRepositoryConfiguration.isV2EntityNotificationEnabled; @@ -201,7 +201,7 @@ public class AtlasEntityChangeNotifier { notifyPropagatedEntities(removedPropagations, PROPAGATED_CLASSIFICATION_DELETE); } - private void notifyPropagatedEntities(Map<String, List<AtlasClassification>> entityPropagationMap, EntityAuditAction action) throws AtlasBaseException { + private void notifyPropagatedEntities(Map<String, List<AtlasClassification>> entityPropagationMap, EntityAuditActionV2 action) throws AtlasBaseException { if (MapUtils.isEmpty(entityPropagationMap) || action == null) { return; } http://git-wip-us.apache.org/repos/asf/atlas/blob/d343a486/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java index 87ca849..aa175a2 100644 --- a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java +++ b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java @@ -110,7 +110,7 @@ public class AuditRepositoryTestBase { @Test public void testAddEventsV2() throws Exception { EntityAuditEventV2 event = new EntityAuditEventV2(rand(), System.currentTimeMillis(), "u1", - EntityAuditEventV2.EntityAuditAction.ENTITY_CREATE, "d1", new AtlasEntity(rand())); + EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE, "d1", new AtlasEntity(rand())); eventRepository.putEventsV2(event); @@ -131,12 +131,12 @@ public class AuditRepositoryTestBase { for (int i = 0; i < 3; i++) { //Add events for both ids - EntityAuditEventV2 event = new EntityAuditEventV2(id2, ts - i, "user" + i, EntityAuditEventV2.EntityAuditAction.ENTITY_UPDATE, "details" + i, entity); + EntityAuditEventV2 event = new EntityAuditEventV2(id2, ts - i, "user" + i, EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE, "details" + i, entity); eventRepository.putEventsV2(event); expectedEvents.add(event); - eventRepository.putEventsV2(new EntityAuditEventV2(id1, ts - i, "user" + i, EntityAuditEventV2.EntityAuditAction.ENTITY_DELETE, "details" + i, entity)); - eventRepository.putEventsV2(new EntityAuditEventV2(id3, ts - i, "user" + i, EntityAuditEventV2.EntityAuditAction.ENTITY_CREATE, "details" + i, entity)); + eventRepository.putEventsV2(new EntityAuditEventV2(id1, ts - i, "user" + i, EntityAuditEventV2.EntityAuditActionV2.ENTITY_DELETE, "details" + i, entity)); + eventRepository.putEventsV2(new EntityAuditEventV2(id3, ts - i, "user" + i, EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE, "details" + i, entity)); } //Use ts for which there is no event - ts + 2