This is an automated email from the ASF dual-hosted git repository. pinal pushed a commit to branch ATLAS-5187 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit fcd5e89e9545767716731d311c3ec05881345cac Author: Pinal Shah <[email protected]> AuthorDate: Fri Jan 16 10:46:14 2026 +0530 ATLAS-5187: Fix audit related issues when rdbms backend is selected --- .../audit/rdbms/dao/DbEntityAuditDao.java | 92 ++++++++++-- .../audit/rdbms/entity/DbEntityAudit.java | 25 +++- .../resources/META-INF/postgres/create_schema.sql | 2 +- .../audit/rdbms/RdbmsBasedAuditRepository.java | 159 ++++++++++++++++++++- 4 files changed, 255 insertions(+), 23 deletions(-) diff --git a/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/dao/DbEntityAuditDao.java b/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/dao/DbEntityAuditDao.java index 4f2bebbb4..90c3db3ab 100644 --- a/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/dao/DbEntityAuditDao.java +++ b/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/dao/DbEntityAuditDao.java @@ -17,7 +17,9 @@ */ package org.apache.atlas.repository.audit.rdbms.dao; +import org.apache.atlas.model.audit.EntityAuditEventV2; import org.apache.atlas.repository.audit.rdbms.entity.DbEntityAudit; +import org.apache.commons.collections.CollectionUtils; import javax.persistence.EntityManager; import javax.persistence.NoResultException; @@ -30,15 +32,24 @@ public class DbEntityAuditDao extends BaseDao<DbEntityAudit> { super(em); } - public List<DbEntityAudit> getByEntityIdActionStartTimeStartIdx(String entityId, int action, long eventTimeStart, int eventIdxStart, int maxResults) { + public List<DbEntityAudit> getByEntityIdActionStartTimeStartIdx(String entityId, String action, long eventTimeStart, int eventIdxStart, int maxResults) { try { - return em.createNamedQuery("DbEntityAudit.getByEntityIdActionStartTimeStartIdx", DbEntityAudit.class) - .setParameter("entityId", entityId) - .setParameter("action", action) - .setParameter("eventTimeStart", eventTimeStart) - .setParameter("eventIdxStart", eventIdxStart) - .setMaxResults(maxResults) - .getResultList(); + if (action == null) { + return em.createNamedQuery("DbEntityAudit.getByEntityIdStartTimeStartIdx", DbEntityAudit.class) + .setParameter("entityId", entityId) + .setParameter("eventTimeStart", eventTimeStart) + .setParameter("eventIdxStart", eventIdxStart) + .setMaxResults(maxResults) + .getResultList(); + } else { + return em.createNamedQuery("DbEntityAudit.getByEntityIdActionStartTimeStartIdx", DbEntityAudit.class) + .setParameter("entityId", entityId) + .setParameter("action", action) + .setParameter("eventTimeStart", eventTimeStart) + .setParameter("eventIdxStart", eventIdxStart) + .setMaxResults(maxResults) + .getResultList(); + } } catch (NoResultException excp) { // ignore } @@ -46,18 +57,27 @@ public class DbEntityAuditDao extends BaseDao<DbEntityAudit> { return Collections.emptyList(); } - public List<DbEntityAudit> getByEntityIdAction(String entityId, Integer action, int startIdx, int maxResults) { + public List<DbEntityAudit> getByEntityIdAction(String entityId, String action, List<String> sortByColumn, boolean sortOrder, int startIdx, int maxResults) { try { - if (action == null) { - return em.createNamedQuery("DbEntityAudit.getByEntityId", DbEntityAudit.class) + StringBuilder query = new StringBuilder("SELECT e FROM DbEntityAudit e WHERE e.entityId = :entityId"); + + if (action != null) { + query.append(" and e.action = :action"); + if (CollectionUtils.isNotEmpty(sortByColumn)) { + query.append(getOrderByQuery(sortByColumn, sortOrder)); + } + return em.createQuery(query.toString(), DbEntityAudit.class) .setParameter("entityId", entityId) + .setParameter("action", action) .setFirstResult(startIdx) .setMaxResults(maxResults) .getResultList(); } else { - return em.createNamedQuery("DbEntityAudit.getByEntityIdAction", DbEntityAudit.class) + if (CollectionUtils.isNotEmpty(sortByColumn)) { + query.append(getOrderByQuery(sortByColumn, sortOrder)); + } + return em.createQuery(query.toString(), DbEntityAudit.class) .setParameter("entityId", entityId) - .setParameter("action", action) .setFirstResult(startIdx) .setMaxResults(maxResults) .getResultList(); @@ -68,4 +88,50 @@ public class DbEntityAuditDao extends BaseDao<DbEntityAudit> { return Collections.emptyList(); } + + public String getOrderByQuery(List<String> sortByColumn, boolean sortOrderDesc) { + StringBuilder orderByQuery = new StringBuilder(" ORDER BY "); + for (int i = 0; i < sortByColumn.size(); i++) { + orderByQuery.append("e.").append(sortByColumn.get(i)); + orderByQuery.append(sortOrderDesc ? " DESC" : " ASC"); + if (i != sortByColumn.size() - 1) { + orderByQuery.append(", "); + } + } + return orderByQuery.toString(); + } + + public List<DbEntityAudit> getLatestAuditsByEntityIdAction(String entityId, String action, boolean createEventsAgeoutAllowed) { + try { + StringBuilder query = new StringBuilder("SELECT e FROM DbEntityAudit e WHERE e.entityId = :entityId"); + if (!createEventsAgeoutAllowed) { + String createEvent = EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE.name(); + String importCreateEvent = EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_CREATE.name(); + + if (action != null && (action.equals(createEvent) || action.equals(importCreateEvent))) { //if action is create event and create events ageout is not allowed, return empty list + return Collections.emptyList(); + } + + query.append(" and e.action NOT IN ('").append(createEvent).append("','").append(importCreateEvent).append("')"); + } + + if (action != null) { + query.append(" and e.action = :action"); + query.append(" ORDER BY e.eventTime DESC, e.eventIndex DESC"); + return em.createQuery(query.toString(), DbEntityAudit.class) + .setParameter("entityId", entityId) + .setParameter("action", action) + .getResultList(); + } else { + query.append(" ORDER BY e.eventTime DESC, e.eventIndex DESC"); + return em.createQuery(query.toString(), DbEntityAudit.class) + .setParameter("entityId", entityId) + .getResultList(); + } + } catch (NoResultException excp) { + // ignore + } + + return Collections.emptyList(); + } } diff --git a/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/entity/DbEntityAudit.java b/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/entity/DbEntityAudit.java index e5c9005e5..f9c207bc8 100644 --- a/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/entity/DbEntityAudit.java +++ b/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/entity/DbEntityAudit.java @@ -30,6 +30,8 @@ import javax.persistence.Table; import java.util.Objects; +import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2; + /** * RDBMS representation of a JanusGraph Column - name/value pair in a JanusGraph key * @@ -61,8 +63,8 @@ public class DbEntityAudit implements java.io.Serializable { @Column(name = "user_name", nullable = false, length = 64) protected String user; - @Column(name = "operation", nullable = false) - protected int action; + @Column(name = "operation", nullable = false, length = 64) + protected String action; @Column(name = "details") @Lob @@ -115,14 +117,27 @@ public class DbEntityAudit implements java.io.Serializable { this.user = user; } - public int getAction() { + public String getAction() { return action; } - public void setAction(int action) { + public void setAction(String action) { this.action = action; } + public void setAction(EntityAuditActionV2 actionEnum) { + this.action = (actionEnum == null) ? null : actionEnum.name(); + } + + public EntityAuditActionV2 getActionEnum() { + if (action == null) return null; + try { + return EntityAuditActionV2.valueOf(action); + } catch (IllegalArgumentException e) { + return null; + } + } + public String getDetails() { return details; } @@ -164,7 +179,7 @@ public class DbEntityAudit implements java.io.Serializable { eventTime == other.eventTime && eventIndex == other.eventIndex && Objects.equals(user, other.user) && - action == other.action && + Objects.equals(action, other.action) && Objects.equals(details, other.details) && Objects.equals(entity, other.entity) && auditType == other.auditType; diff --git a/graphdb/janusgraph-rdbms/src/main/resources/META-INF/postgres/create_schema.sql b/graphdb/janusgraph-rdbms/src/main/resources/META-INF/postgres/create_schema.sql index 1327ae6e3..291c0ecb1 100644 --- a/graphdb/janusgraph-rdbms/src/main/resources/META-INF/postgres/create_schema.sql +++ b/graphdb/janusgraph-rdbms/src/main/resources/META-INF/postgres/create_schema.sql @@ -16,7 +16,7 @@ -- DB objects for Atlas entity audit CREATE SEQUENCE IF NOT EXISTS atlas_entity_audit_seq INCREMENT BY 1 CACHE 1000; -CREATE TABLE IF NOT EXISTS atlas_entity_audit(id BIGINT, entity_id VARCHAR(64) NOT NULL, event_time BIGINT NOT NULL, event_idx INT NOT NULL, user_name VARCHAR(64) NOT NULL, operation INT NOT NULL, details TEXT DEFAULT NULL, entity TEXT DEFAULT NULL, audit_type INT NOT NULL, PRIMARY KEY(id)); +CREATE TABLE IF NOT EXISTS atlas_entity_audit(id BIGINT, entity_id VARCHAR(64) NOT NULL, event_time BIGINT NOT NULL, event_idx INT NOT NULL, user_name VARCHAR(64) NOT NULL, operation VARCHAR(64) NOT NULL, details TEXT DEFAULT NULL, entity TEXT DEFAULT NULL, audit_type INT NOT NULL, PRIMARY KEY(id)); CREATE INDEX IF NOT EXISTS atlas_entity_audit_idx_entity_id ON atlas_entity_audit (entity_id); CREATE INDEX IF NOT EXISTS atlas_entity_audit_idx_event_time ON atlas_entity_audit (event_time); diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/RdbmsBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/RdbmsBasedAuditRepository.java index 801123ace..20fddccf8 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/RdbmsBasedAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/RdbmsBasedAuditRepository.java @@ -18,30 +18,49 @@ package org.apache.atlas.repository.audit.rdbms; import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasException; import org.apache.atlas.EntityAuditEvent; +import org.apache.atlas.RequestContext; import org.apache.atlas.annotation.ConditionalOnAtlasProperty; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.audit.EntityAuditEventV2; +import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.audit.AbstractStorageBasedAuditRepository; import org.apache.atlas.repository.audit.rdbms.dao.DbEntityAuditDao; import org.apache.atlas.repository.audit.rdbms.entity.DbEntityAudit; +import org.apache.atlas.utils.AtlasPerfMetrics; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import javax.inject.Singleton; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.atlas.model.audit.EntityAuditEventV2.SORT_COLUMN_ACTION; +import static org.apache.atlas.model.audit.EntityAuditEventV2.SORT_COLUMN_TIMESTAMP; +import static org.apache.atlas.model.audit.EntityAuditEventV2.SORT_COLUMN_USER; + @Singleton @Component @ConditionalOnAtlasProperty(property = "atlas.EntityAuditRepository.impl", isDefault = false) @Order(0) public class RdbmsBasedAuditRepository extends AbstractStorageBasedAuditRepository { + private static final Logger LOG = LoggerFactory.getLogger(RdbmsBasedAuditRepository.class); + @Override public void putEventsV1(List<EntityAuditEvent> events) throws AtlasException { throw new UnsupportedOperationException(); @@ -77,7 +96,7 @@ public class RdbmsBasedAuditRepository extends AbstractStorageBasedAuditReposito try (RdbmsTransaction trx = new RdbmsTransaction()) { DbEntityAuditDao dao = new DbEntityAuditDao(trx.getEntityManager()); - List<DbEntityAudit> dbEvents = dao.getByEntityIdActionStartTimeStartIdx(entityId, auditAction.ordinal(), getTimestampFromKey(startKey), getIndexFromKey(startKey), maxResultCount); + List<DbEntityAudit> dbEvents = dao.getByEntityIdActionStartTimeStartIdx(entityId, auditAction == null ? null : auditAction.name(), getTimestampFromKey(startKey), getIndexFromKey(startKey), maxResultCount); return dbEvents.stream().map(RdbmsBasedAuditRepository::fromDbEntityAudit).collect(Collectors.toList()); } catch (Exception excp) { @@ -90,7 +109,7 @@ public class RdbmsBasedAuditRepository extends AbstractStorageBasedAuditReposito try (RdbmsTransaction trx = new RdbmsTransaction()) { DbEntityAuditDao dao = new DbEntityAuditDao(trx.getEntityManager()); - List<DbEntityAudit> dbEvents = dao.getByEntityIdAction(entityId, auditAction == null ? null : auditAction.ordinal(), offset, limit); + List<DbEntityAudit> dbEvents = dao.getByEntityIdAction(entityId, auditAction == null ? null : auditAction.name(), mapSortColumnToDbColumn(sortByColumn), sortOrderDesc, offset, limit); return dbEvents.stream().map(RdbmsBasedAuditRepository::fromDbEntityAudit).collect(Collectors.toList()); } catch (Exception excp) { @@ -112,13 +131,127 @@ public class RdbmsBasedAuditRepository extends AbstractStorageBasedAuditReposito public void stop() throws AtlasException { } + @Override + public List<EntityAuditEventV2> deleteEventsV2(String entityId, + Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, + short allowedAuditCount, + int ttlInDays, + boolean createEventsAgeoutAllowed, + Constants.AtlasAuditAgingType auditAgingType) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("deleteEventsV2"); + + try (RdbmsTransaction trx = new RdbmsTransaction()) { + DbEntityAuditDao dao = new DbEntityAuditDao(trx.getEntityManager()); + List<EntityAuditEventV2> deletedEvents = new ArrayList<>(); + + boolean allowAgeoutByAuditCount = allowedAuditCount > 0 || (auditAgingType == Constants.AtlasAuditAgingType.SWEEP); + + List<DbEntityAudit> eventsEligibleForAgeout = new ArrayList<>(); + List<DbEntityAudit> eventsToKeep = new ArrayList<>(); + + if (CollectionUtils.isEmpty(entityAuditActions)) { + List<DbEntityAudit> dbEvents = dao.getLatestAuditsByEntityIdAction(entityId, null, createEventsAgeoutAllowed); + + if (!allowAgeoutByAuditCount) { + eventsToKeep.addAll(dbEvents); + } else { + int limit = Math.max(0, allowedAuditCount); + for (int i = 0; i < dbEvents.size(); i++) { + if (i < limit) { + eventsToKeep.add(dbEvents.get(i)); + } else { + eventsEligibleForAgeout.add(dbEvents.get(i)); + } + } + } + } else { + for (EntityAuditEventV2.EntityAuditActionV2 action : entityAuditActions) { + String actionName = action == null ? null : action.name(); + List<DbEntityAudit> dbEvents = dao.getLatestAuditsByEntityIdAction(entityId, actionName, createEventsAgeoutAllowed); + + if (!allowAgeoutByAuditCount) { + eventsToKeep.addAll(dbEvents); + } else { + int limit = Math.max(0, allowedAuditCount); + for (int i = 0; i < dbEvents.size(); i++) { + if (i < limit) { + eventsToKeep.add(dbEvents.get(i)); + } else { + eventsEligibleForAgeout.add(dbEvents.get(i)); + } + } + } + } + } + + if (CollectionUtils.isNotEmpty(eventsToKeep)) { + //Limit events based on configured audit count by grouping events of all action types + if (allowAgeoutByAuditCount && (auditAgingType == Constants.AtlasAuditAgingType.DEFAULT || CollectionUtils.isEmpty(entityAuditActions))) { + LOG.debug("Aging out audit events by audit count for entity: {}", entityId); + + eventsToKeep.sort((a, b) -> { + int cmp = Long.compare(b.getEventTime(), a.getEventTime()); + if (cmp == 0) { + cmp = Integer.compare(b.getEventIndex(), a.getEventIndex()); + } + return cmp; + }); + + if (allowedAuditCount < eventsToKeep.size()) { + eventsEligibleForAgeout.addAll(eventsToKeep.subList(allowedAuditCount, eventsToKeep.size())); + eventsToKeep = eventsToKeep.subList(0, allowedAuditCount); + } + } + + //TTL based aging + if (ttlInDays > 0) { + LOG.debug("Aging out audit events by TTL for entity: {}", entityId); + + LocalDateTime now = LocalDateTime.now(); + boolean isTTLTestAutomation = AtlasConfiguration.ATLAS_AUDIT_AGING_TTL_TEST_AUTOMATION.getBoolean(); + long ttlTimestamp = Timestamp.valueOf(isTTLTestAutomation ? now.minusMinutes(ttlInDays) : now.minusDays(ttlInDays)).getTime(); + + eventsToKeep.forEach(e -> { + if (e.getEventTime() < ttlTimestamp) { + eventsEligibleForAgeout.add(e); + } + }); + } + } + + Map<Long, DbEntityAudit> uniqueEligible = new LinkedHashMap<>(); + for (DbEntityAudit db : eventsEligibleForAgeout) { + if (db != null && db.getId() != null) { + uniqueEligible.put(db.getId(), db); + } + } + + LOG.debug("Deleting events from audit table: {}", uniqueEligible.keySet()); + for (DbEntityAudit dbEvent : uniqueEligible.values()) { + try { + trx.getEntityManager().remove(dbEvent); + deletedEvents.add(fromDbEntityAudit(dbEvent)); + } catch (Exception e) { + throw new AtlasBaseException("Failed to remove audit event: " + dbEvent, e); + } + } + + trx.commit(); + return deletedEvents; + } catch (Exception e) { + throw new AtlasBaseException("Error while deleting audit events", e); + } finally { + RequestContext.get().endMetricRecord(metric); + } + } + public static DbEntityAudit toDbEntityAudit(EntityAuditEventV2 event) { DbEntityAudit ret = new DbEntityAudit(); ret.setEntityId(event.getEntityId()); ret.setEventTime(event.getTimestamp()); ret.setUser(event.getUser()); - ret.setAction(event.getAction().ordinal()); + ret.setAction(event.getAction()); ret.setDetails(event.getDetails()); if (event.getType() == null) { @@ -140,7 +273,10 @@ public class RdbmsBasedAuditRepository extends AbstractStorageBasedAuditReposito ret.setEntityId(dbEntityAudit.getEntityId()); ret.setTimestamp(dbEntityAudit.getEventTime()); ret.setUser(dbEntityAudit.getUser()); - ret.setAction(EntityAuditEventV2.EntityAuditActionV2.values()[dbEntityAudit.getAction()]); + EntityAuditEventV2.EntityAuditActionV2 actionEnum = dbEntityAudit.getActionEnum(); + if (actionEnum != null) { + ret.setAction(actionEnum); + } ret.setDetails(dbEntityAudit.getDetails()); ret.setType(EntityAuditEventV2.EntityAuditType.values()[dbEntityAudit.getAuditType()]); @@ -158,4 +294,19 @@ public class RdbmsBasedAuditRepository extends AbstractStorageBasedAuditReposito throw new RuntimeException("Failed to get application properties", e); } } + + private List<String> mapSortColumnToDbColumn(String sortByColumn) { + if (StringUtils.isEmpty(sortByColumn)) { + return null; + } + switch (sortByColumn) { + case SORT_COLUMN_USER: + return java.util.Arrays.asList("user", "eventTime", "eventIndex"); + case SORT_COLUMN_ACTION: + return java.util.Arrays.asList("action", "eventTime", "eventIndex"); + case SORT_COLUMN_TIMESTAMP: + default: + return java.util.Arrays.asList("eventTime", "eventIndex"); + } + } }
