This is an automated email from the ASF dual-hosted git repository.
pinal pushed a commit to branch atlas-2.5
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/atlas-2.5 by this push:
new 3d9edb34d ATLAS-5187: Fix audit related issues when rdbms backend is
selected (#498)
3d9edb34d is described below
commit 3d9edb34d2e7b7719c3c7f411dfd9ae09a3001ba
Author: Pinal Shah <[email protected]>
AuthorDate: Thu Feb 5 15:50:21 2026 +0530
ATLAS-5187: Fix audit related issues when rdbms backend is selected (#498)
(cherry picked from commit ff6e161dc3226be9a2d83370195e2c5a916c1c11)
---
.../audit/rdbms/dao/DbEntityAuditDao.java | 91 ++++++++++--
.../audit/rdbms/entity/DbEntityAudit.java | 10 +-
.../resources/META-INF/janus-jpa_named_queries.xml | 14 +-
.../resources/META-INF/postgres/create_schema.sql | 2 +-
.../audit/AbstractStorageBasedAuditRepository.java | 16 ++-
.../audit/CassandraBasedAuditRepository.java | 10 ++
.../audit/rdbms/RdbmsBasedAuditRepository.java | 159 ++++++++++++++++++++-
7 files changed, 262 insertions(+), 40 deletions(-)
diff --git
a/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/dao/DbEntityAuditDao.java
b/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/dao/DbEntityAuditDao.java
index 4f2bebbb4..e6880344d 100644
---
a/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/dao/DbEntityAuditDao.java
+++
b/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/dao/DbEntityAuditDao.java
@@ -18,6 +18,7 @@
package org.apache.atlas.repository.audit.rdbms.dao;
import org.apache.atlas.repository.audit.rdbms.entity.DbEntityAudit;
+import org.apache.commons.collections.CollectionUtils;
import javax.persistence.EntityManager;
import javax.persistence.NoResultException;
@@ -30,15 +31,24 @@ public class DbEntityAuditDao extends
BaseDao<DbEntityAudit> {
super(em);
}
- public List<DbEntityAudit> getByEntityIdActionStartTimeStartIdx(String
entityId, int action, long eventTimeStart, int eventIdxStart, int maxResults) {
+ public List<DbEntityAudit> getByEntityIdActionStartTimeStartIdx(String
entityId, String action, long eventTimeStart, int eventIdxStart, int
maxResults) {
try {
- return
em.createNamedQuery("DbEntityAudit.getByEntityIdActionStartTimeStartIdx",
DbEntityAudit.class)
- .setParameter("entityId", entityId)
- .setParameter("action", action)
- .setParameter("eventTimeStart", eventTimeStart)
- .setParameter("eventIdxStart", eventIdxStart)
- .setMaxResults(maxResults)
- .getResultList();
+ if (action == null) {
+ return
em.createNamedQuery("DbEntityAudit.getByEntityIdStartTimeStartIdx",
DbEntityAudit.class)
+ .setParameter("entityId", entityId)
+ .setParameter("eventTimeStart", eventTimeStart)
+ .setParameter("eventIdxStart", eventIdxStart)
+ .setMaxResults(maxResults)
+ .getResultList();
+ } else {
+ return
em.createNamedQuery("DbEntityAudit.getByEntityIdActionStartTimeStartIdx",
DbEntityAudit.class)
+ .setParameter("entityId", entityId)
+ .setParameter("action", action)
+ .setParameter("eventTimeStart", eventTimeStart)
+ .setParameter("eventIdxStart", eventIdxStart)
+ .setMaxResults(maxResults)
+ .getResultList();
+ }
} catch (NoResultException excp) {
// ignore
}
@@ -46,18 +56,27 @@ public class DbEntityAuditDao extends
BaseDao<DbEntityAudit> {
return Collections.emptyList();
}
- public List<DbEntityAudit> getByEntityIdAction(String entityId, Integer
action, int startIdx, int maxResults) {
+ public List<DbEntityAudit> getByEntityIdAction(String entityId, String
action, List<String> sortByColumn, boolean sortOrder, int startIdx, int
maxResults) {
try {
- if (action == null) {
- return em.createNamedQuery("DbEntityAudit.getByEntityId",
DbEntityAudit.class)
+ StringBuilder query = new StringBuilder("SELECT e FROM
DbEntityAudit e WHERE e.entityId = :entityId");
+
+ if (action != null) {
+ query.append(" and e.action = :action");
+ if (CollectionUtils.isNotEmpty(sortByColumn)) {
+ query.append(getOrderByQuery(sortByColumn, sortOrder));
+ }
+ return em.createQuery(query.toString(), DbEntityAudit.class)
.setParameter("entityId", entityId)
+ .setParameter("action", action)
.setFirstResult(startIdx)
.setMaxResults(maxResults)
.getResultList();
} else {
- return
em.createNamedQuery("DbEntityAudit.getByEntityIdAction", DbEntityAudit.class)
+ if (CollectionUtils.isNotEmpty(sortByColumn)) {
+ query.append(getOrderByQuery(sortByColumn, sortOrder));
+ }
+ return em.createQuery(query.toString(), DbEntityAudit.class)
.setParameter("entityId", entityId)
- .setParameter("action", action)
.setFirstResult(startIdx)
.setMaxResults(maxResults)
.getResultList();
@@ -68,4 +87,50 @@ public class DbEntityAuditDao extends BaseDao<DbEntityAudit>
{
return Collections.emptyList();
}
+
+ public String getOrderByQuery(List<String> sortByColumn, boolean
sortOrderDesc) {
+ StringBuilder orderByQuery = new StringBuilder(" ORDER BY ");
+ for (int i = 0; i < sortByColumn.size(); i++) {
+ orderByQuery.append("e.").append(sortByColumn.get(i));
+ orderByQuery.append(sortOrderDesc ? " DESC" : " ASC");
+ if (i != sortByColumn.size() - 1) {
+ orderByQuery.append(", ");
+ }
+ }
+ return orderByQuery.toString();
+ }
+
+ public List<DbEntityAudit> getLatestAuditsByEntityIdAction(String
entityId, String action, List<String> filterActions) {
+ try {
+ StringBuilder query = new StringBuilder("SELECT e FROM
DbEntityAudit e WHERE e.entityId = :entityId");
+
+ if (action != null) {
+ query.append(" and e.action = :action");
+ query.append(" ORDER BY e.eventTime DESC, e.eventIndex DESC");
+ return em.createQuery(query.toString(), DbEntityAudit.class)
+ .setParameter("entityId", entityId)
+ .setParameter("action", action)
+ .getResultList();
+ } else {
+ if (CollectionUtils.isNotEmpty(filterActions)) {
+ query.append(" and e.action NOT IN (");
+ for (int i = 0; i < filterActions.size(); i++) {
+
query.append("'").append(filterActions.get(i)).append("'");
+ if (i != filterActions.size() - 1) {
+ query.append(", ");
+ }
+ }
+ query.append(")");
+ }
+ query.append(" ORDER BY e.eventTime DESC, e.eventIndex DESC");
+ return em.createQuery(query.toString(), DbEntityAudit.class)
+ .setParameter("entityId", entityId)
+ .getResultList();
+ }
+ } catch (NoResultException excp) {
+ // ignore
+ }
+
+ return Collections.emptyList();
+ }
}
diff --git
a/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/entity/DbEntityAudit.java
b/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/entity/DbEntityAudit.java
index e5c9005e5..0adbcda8c 100644
---
a/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/entity/DbEntityAudit.java
+++
b/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/entity/DbEntityAudit.java
@@ -61,8 +61,8 @@ public class DbEntityAudit implements java.io.Serializable {
@Column(name = "user_name", nullable = false, length = 64)
protected String user;
- @Column(name = "operation", nullable = false)
- protected int action;
+ @Column(name = "operation", nullable = false, length = 64)
+ protected String action;
@Column(name = "details")
@Lob
@@ -115,11 +115,11 @@ public class DbEntityAudit implements
java.io.Serializable {
this.user = user;
}
- public int getAction() {
+ public String getAction() {
return action;
}
- public void setAction(int action) {
+ public void setAction(String action) {
this.action = action;
}
@@ -164,7 +164,7 @@ public class DbEntityAudit implements java.io.Serializable {
eventTime == other.eventTime &&
eventIndex == other.eventIndex &&
Objects.equals(user, other.user) &&
- action == other.action &&
+ Objects.equals(action, other.action) &&
Objects.equals(details, other.details) &&
Objects.equals(entity, other.entity) &&
auditType == other.auditType;
diff --git
a/graphdb/janusgraph-rdbms/src/main/resources/META-INF/janus-jpa_named_queries.xml
b/graphdb/janusgraph-rdbms/src/main/resources/META-INF/janus-jpa_named_queries.xml
index 183b044cb..a0258f5ba 100644
---
a/graphdb/janusgraph-rdbms/src/main/resources/META-INF/janus-jpa_named_queries.xml
+++
b/graphdb/janusgraph-rdbms/src/main/resources/META-INF/janus-jpa_named_queries.xml
@@ -13,25 +13,19 @@
xmlns="http://java.sun.com/xml/ns/persistence/orm"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/persistence/orm
http://java.sun.com/xml/ns/persistence/orm_1_0.xsd ">
- <named-query name="DbEntityAudit.getByEntityId">
- <query>SELECT obj FROM DbEntityAudit obj
- WHERE obj.entityId = :entityId
- ORDER BY obj.eventTime DESC, obj.eventIndex DESC
- </query>
- </named-query>
-
- <named-query name="DbEntityAudit.getByEntityIdAction">
+ <named-query name="DbEntityAudit.getByEntityIdActionStartTimeStartIdx">
<query>SELECT obj FROM DbEntityAudit obj
WHERE obj.entityId = :entityId
AND obj.action = :action
+ AND obj.eventTime >= :eventTimeStart
+ AND obj.eventIndex >= :eventIdxStart
ORDER BY obj.eventTime DESC, obj.eventIndex DESC
</query>
</named-query>
- <named-query name="DbEntityAudit.getByEntityIdActionStartTimeStartIdx">
+ <named-query name="DbEntityAudit.getByEntityIdStartTimeStartIdx">
<query>SELECT obj FROM DbEntityAudit obj
WHERE obj.entityId = :entityId
- AND obj.action = :action
AND obj.eventTime >= :eventTimeStart
AND obj.eventIndex >= :eventIdxStart
ORDER BY obj.eventTime DESC, obj.eventIndex DESC
diff --git
a/graphdb/janusgraph-rdbms/src/main/resources/META-INF/postgres/create_schema.sql
b/graphdb/janusgraph-rdbms/src/main/resources/META-INF/postgres/create_schema.sql
index 1327ae6e3..291c0ecb1 100644
---
a/graphdb/janusgraph-rdbms/src/main/resources/META-INF/postgres/create_schema.sql
+++
b/graphdb/janusgraph-rdbms/src/main/resources/META-INF/postgres/create_schema.sql
@@ -16,7 +16,7 @@
-- DB objects for Atlas entity audit
CREATE SEQUENCE IF NOT EXISTS atlas_entity_audit_seq INCREMENT BY 1 CACHE 1000;
-CREATE TABLE IF NOT EXISTS atlas_entity_audit(id BIGINT, entity_id VARCHAR(64)
NOT NULL, event_time BIGINT NOT NULL, event_idx INT NOT NULL, user_name
VARCHAR(64) NOT NULL, operation INT NOT NULL, details TEXT DEFAULT NULL, entity
TEXT DEFAULT NULL, audit_type INT NOT NULL, PRIMARY KEY(id));
+CREATE TABLE IF NOT EXISTS atlas_entity_audit(id BIGINT, entity_id VARCHAR(64)
NOT NULL, event_time BIGINT NOT NULL, event_idx INT NOT NULL, user_name
VARCHAR(64) NOT NULL, operation VARCHAR(64) NOT NULL, details TEXT DEFAULT
NULL, entity TEXT DEFAULT NULL, audit_type INT NOT NULL, PRIMARY KEY(id));
CREATE INDEX IF NOT EXISTS atlas_entity_audit_idx_entity_id ON
atlas_entity_audit (entity_id);
CREATE INDEX IF NOT EXISTS atlas_entity_audit_idx_event_time ON
atlas_entity_audit (event_time);
diff --git
a/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java
b/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java
index a12e56d44..06bc461e5 100644
---
a/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java
+++
b/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java
@@ -24,10 +24,10 @@ import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.audit.EntityAuditEventV2;
-import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
import org.apache.atlas.service.Service;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +36,6 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
/**
* This abstract base class should be used when adding support for an audit
storage backend.
@@ -83,11 +82,6 @@ public abstract class AbstractStorageBasedAuditRepository
implements Service, En
putEventsV2(Arrays.asList(events));
}
- @Override
- public List<EntityAuditEventV2> deleteEventsV2(String entityId,
Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short
auditCount, int ttlInDays, boolean createEventsAgeoutAllowed,
AtlasAuditAgingType auditAgingType) throws AtlasBaseException, AtlasException {
- return null;
- }
-
@Override
public List<Object> listEvents(String entityId, String startKey, short
maxResults) throws AtlasBaseException {
List ret = listEventsV2(entityId, null, startKey, maxResults);
@@ -168,6 +162,10 @@ public abstract class AbstractStorageBasedAuditRepository
implements Service, En
}
protected long getTimestampFromKey(String key) {
+ if (StringUtils.isEmpty(key)) {
+ return 0L;
+ }
+
String[] parts = key.split(FIELD_SEPARATOR);
if (parts.length < 3) {
@@ -183,6 +181,10 @@ public abstract class AbstractStorageBasedAuditRepository
implements Service, En
}
protected int getIndexFromKey(String key) {
+ if (StringUtils.isEmpty(key)) {
+ return 0;
+ }
+
String[] parts = key.split(FIELD_SEPARATOR);
if (parts.length < 3) {
diff --git
a/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java
b/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java
index 226828746..979fec798 100644
---
a/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java
+++
b/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java
@@ -28,7 +28,9 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
+import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.EntityAuditEventV2;
+import org.apache.atlas.repository.Constants;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -38,6 +40,7 @@ import org.springframework.stereotype.Component;
import javax.inject.Singleton;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -203,6 +206,13 @@ public class CassandraBasedAuditRepository extends
AbstractStorageBasedAuditRepo
return listEventsV2(entityId, auditAction, null, limit);
}
+ @Override
+ public List<EntityAuditEventV2> deleteEventsV2(String entityId,
Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short
auditCount, int ttlInDays, boolean createEventsAgeoutAllowed,
Constants.AtlasAuditAgingType auditAgingType)
+ throws AtlasBaseException, AtlasException
+ {
+ return Collections.emptyList();
+ }
+
@Override
public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long
toTimestamp) {
throw new NotImplementedException();
diff --git
a/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/RdbmsBasedAuditRepository.java
b/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/RdbmsBasedAuditRepository.java
index 801123ace..77d145834 100644
---
a/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/RdbmsBasedAuditRepository.java
+++
b/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/RdbmsBasedAuditRepository.java
@@ -18,30 +18,50 @@
package org.apache.atlas.repository.audit.rdbms;
import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
+import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.EntityAuditEventV2;
+import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.audit.AbstractStorageBasedAuditRepository;
import org.apache.atlas.repository.audit.rdbms.dao.DbEntityAuditDao;
import org.apache.atlas.repository.audit.rdbms.entity.DbEntityAudit;
+import org.apache.atlas.utils.AtlasPerfMetrics;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.inject.Singleton;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import static
org.apache.atlas.model.audit.EntityAuditEventV2.SORT_COLUMN_ACTION;
+import static
org.apache.atlas.model.audit.EntityAuditEventV2.SORT_COLUMN_TIMESTAMP;
+import static org.apache.atlas.model.audit.EntityAuditEventV2.SORT_COLUMN_USER;
+
@Singleton
@Component
@ConditionalOnAtlasProperty(property = "atlas.EntityAuditRepository.impl",
isDefault = false)
@Order(0)
public class RdbmsBasedAuditRepository extends
AbstractStorageBasedAuditRepository {
+ private static final Logger LOG =
LoggerFactory.getLogger(RdbmsBasedAuditRepository.class);
+
@Override
public void putEventsV1(List<EntityAuditEvent> events) throws
AtlasException {
throw new UnsupportedOperationException();
@@ -77,7 +97,7 @@ public class RdbmsBasedAuditRepository extends
AbstractStorageBasedAuditReposito
try (RdbmsTransaction trx = new RdbmsTransaction()) {
DbEntityAuditDao dao = new
DbEntityAuditDao(trx.getEntityManager());
- List<DbEntityAudit> dbEvents =
dao.getByEntityIdActionStartTimeStartIdx(entityId, auditAction.ordinal(),
getTimestampFromKey(startKey), getIndexFromKey(startKey), maxResultCount);
+ List<DbEntityAudit> dbEvents =
dao.getByEntityIdActionStartTimeStartIdx(entityId, auditAction == null ? null :
auditAction.name(), getTimestampFromKey(startKey), getIndexFromKey(startKey),
maxResultCount);
return
dbEvents.stream().map(RdbmsBasedAuditRepository::fromDbEntityAudit).collect(Collectors.toList());
} catch (Exception excp) {
@@ -90,7 +110,7 @@ public class RdbmsBasedAuditRepository extends
AbstractStorageBasedAuditReposito
try (RdbmsTransaction trx = new RdbmsTransaction()) {
DbEntityAuditDao dao = new
DbEntityAuditDao(trx.getEntityManager());
- List<DbEntityAudit> dbEvents = dao.getByEntityIdAction(entityId,
auditAction == null ? null : auditAction.ordinal(), offset, limit);
+ List<DbEntityAudit> dbEvents = dao.getByEntityIdAction(entityId,
auditAction == null ? null : auditAction.name(),
mapSortColumnToDbColumn(sortByColumn), sortOrderDesc, offset, limit);
return
dbEvents.stream().map(RdbmsBasedAuditRepository::fromDbEntityAudit).collect(Collectors.toList());
} catch (Exception excp) {
@@ -112,13 +132,129 @@ public class RdbmsBasedAuditRepository extends
AbstractStorageBasedAuditReposito
public void stop() throws AtlasException {
}
+ @Override
+ public List<EntityAuditEventV2> deleteEventsV2(String entityId,
+ Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions,
+ short allowedAuditCount,
+ int ttlInDays,
+ boolean createEventsAgeoutAllowed,
+ Constants.AtlasAuditAgingType auditAgingType) throws
AtlasBaseException {
+ AtlasPerfMetrics.MetricRecorder metric =
RequestContext.get().startMetricRecord("deleteEventsV2");
+
+ try (RdbmsTransaction trx = new RdbmsTransaction()) {
+ DbEntityAuditDao dao = new
DbEntityAuditDao(trx.getEntityManager());
+ List<EntityAuditEventV2> deletedEvents = new ArrayList<>();
+
+ boolean allowAgeoutByAuditCount = allowedAuditCount > 0 ||
(auditAgingType == Constants.AtlasAuditAgingType.SWEEP);
+ List<String> ageoutActionsNotAllowed = createEventsAgeoutAllowed ?
Collections.emptyList() :
+
Arrays.asList(EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE.name(),
+
EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_CREATE.name());
+
+ List<DbEntityAudit> eventsEligibleForAgeout = new ArrayList<>();
+ List<DbEntityAudit> eventsToKeep = new ArrayList<>();
+
+ if (CollectionUtils.isEmpty(entityAuditActions)) {
+ List<DbEntityAudit> dbEvents =
dao.getLatestAuditsByEntityIdAction(entityId, null, ageoutActionsNotAllowed);
+
+ splitEventsToKeepAndAgeoutByAuditCount(dbEvents,
allowAgeoutByAuditCount, allowedAuditCount, eventsToKeep,
eventsEligibleForAgeout);
+ } else {
+ for (EntityAuditEventV2.EntityAuditActionV2 action :
entityAuditActions) {
+ String actionName = action == null ? null : action.name();
+ List<DbEntityAudit> dbEvents =
dao.getLatestAuditsByEntityIdAction(entityId, actionName,
ageoutActionsNotAllowed);
+
+ splitEventsToKeepAndAgeoutByAuditCount(dbEvents,
allowAgeoutByAuditCount, allowedAuditCount, eventsToKeep,
eventsEligibleForAgeout);
+ }
+ }
+
+ if (CollectionUtils.isNotEmpty(eventsToKeep)) {
+ //Limit events based on configured audit count by grouping
events of all action types
+ if (allowAgeoutByAuditCount && (auditAgingType ==
Constants.AtlasAuditAgingType.DEFAULT ||
CollectionUtils.isEmpty(entityAuditActions))) {
+ LOG.debug("Aging out audit events by audit count for
entity: {}", entityId);
+
+ eventsToKeep.sort((a, b) -> {
+ int cmp = Long.compare(b.getEventTime(),
a.getEventTime());
+ if (cmp == 0) {
+ cmp = Integer.compare(b.getEventIndex(),
a.getEventIndex());
+ }
+ return cmp;
+ });
+
+ if (allowedAuditCount < eventsToKeep.size()) {
+
eventsEligibleForAgeout.addAll(eventsToKeep.subList(allowedAuditCount,
eventsToKeep.size()));
+ eventsToKeep = eventsToKeep.subList(0,
allowedAuditCount);
+ }
+ }
+
+ //TTL based aging
+ if (ttlInDays > 0) {
+ LOG.debug("Aging out audit events by TTL for entity: {}",
entityId);
+
+ LocalDateTime now = LocalDateTime.now();
+ boolean isTTLTestAutomation =
AtlasConfiguration.ATLAS_AUDIT_AGING_TTL_TEST_AUTOMATION.getBoolean();
+ long ttlTimestamp = Timestamp.valueOf(isTTLTestAutomation
? now.minusMinutes(ttlInDays) : now.minusDays(ttlInDays)).getTime();
+
+ eventsToKeep.forEach(e -> {
+ if (e.getEventTime() < ttlTimestamp) {
+ eventsEligibleForAgeout.add(e);
+ }
+ });
+ }
+ }
+
+ Map<Long, DbEntityAudit> uniqueEligible = new LinkedHashMap<>();
+ for (DbEntityAudit db : eventsEligibleForAgeout) {
+ if (db != null && db.getId() != null) {
+ uniqueEligible.put(db.getId(), db);
+ }
+ }
+
+ LOG.debug("Deleting events from audit table: {}",
uniqueEligible.keySet());
+ for (DbEntityAudit dbEvent : uniqueEligible.values()) {
+ try {
+ trx.getEntityManager().remove(dbEvent);
+ deletedEvents.add(fromDbEntityAudit(dbEvent));
+ } catch (Exception e) {
+ throw new AtlasBaseException("Failed to remove audit
event: " + dbEvent, e);
+ }
+ }
+
+ trx.commit();
+ return deletedEvents;
+ } catch (Exception e) {
+ throw new AtlasBaseException("Error while deleting audit events",
e);
+ } finally {
+ RequestContext.get().endMetricRecord(metric);
+ }
+ }
+
+ private void splitEventsToKeepAndAgeoutByAuditCount(List<DbEntityAudit>
dbEvents, boolean allowAgeoutByAuditCount, int allowedAuditCount,
+ List<DbEntityAudit> eventsToKeep, List<DbEntityAudit>
eventsEligibleForAgeout) {
+
+ if (CollectionUtils.isEmpty(dbEvents)) {
+ return;
+ }
+
+ if (!allowAgeoutByAuditCount) {
+ eventsToKeep.addAll(dbEvents);
+ } else {
+ int limit = Math.max(0, allowedAuditCount);
+ for (int i = 0; i < dbEvents.size(); i++) {
+ if (i < limit) {
+ eventsToKeep.add(dbEvents.get(i));
+ } else {
+ eventsEligibleForAgeout.add(dbEvents.get(i));
+ }
+ }
+ }
+ }
+
public static DbEntityAudit toDbEntityAudit(EntityAuditEventV2 event) {
DbEntityAudit ret = new DbEntityAudit();
ret.setEntityId(event.getEntityId());
ret.setEventTime(event.getTimestamp());
ret.setUser(event.getUser());
- ret.setAction(event.getAction().ordinal());
+ ret.setAction(event.getAction() != null ? event.getAction().name() :
null);
ret.setDetails(event.getDetails());
if (event.getType() == null) {
@@ -140,7 +276,7 @@ public class RdbmsBasedAuditRepository extends
AbstractStorageBasedAuditReposito
ret.setEntityId(dbEntityAudit.getEntityId());
ret.setTimestamp(dbEntityAudit.getEventTime());
ret.setUser(dbEntityAudit.getUser());
-
ret.setAction(EntityAuditEventV2.EntityAuditActionV2.values()[dbEntityAudit.getAction()]);
+ ret.setAction(dbEntityAudit.getAction() != null ?
EntityAuditEventV2.EntityAuditActionV2.valueOf(dbEntityAudit.getAction()) :
null);
ret.setDetails(dbEntityAudit.getDetails());
ret.setType(EntityAuditEventV2.EntityAuditType.values()[dbEntityAudit.getAuditType()]);
@@ -158,4 +294,19 @@ public class RdbmsBasedAuditRepository extends
AbstractStorageBasedAuditReposito
throw new RuntimeException("Failed to get application properties",
e);
}
}
+
+ private List<String> mapSortColumnToDbColumn(String sortByColumn) {
+ if (StringUtils.isEmpty(sortByColumn)) {
+ return null;
+ }
+ switch (sortByColumn) {
+ case SORT_COLUMN_USER:
+ return java.util.Arrays.asList("user", "eventTime",
"eventIndex");
+ case SORT_COLUMN_ACTION:
+ return java.util.Arrays.asList("action", "eventTime",
"eventIndex");
+ case SORT_COLUMN_TIMESTAMP:
+ default:
+ return java.util.Arrays.asList("eventTime", "eventIndex");
+ }
+ }
}