This is an automated email from the ASF dual-hosted git repository.

pinal pushed a commit to branch ATLAS-5187
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit fcd5e89e9545767716731d311c3ec05881345cac
Author: Pinal Shah <[email protected]>
AuthorDate: Fri Jan 16 10:46:14 2026 +0530

    ATLAS-5187: Fix audit related issues when rdbms backend is selected
---
 .../audit/rdbms/dao/DbEntityAuditDao.java          |  92 ++++++++++--
 .../audit/rdbms/entity/DbEntityAudit.java          |  25 +++-
 .../resources/META-INF/postgres/create_schema.sql  |   2 +-
 .../audit/rdbms/RdbmsBasedAuditRepository.java     | 159 ++++++++++++++++++++-
 4 files changed, 255 insertions(+), 23 deletions(-)

diff --git 
a/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/dao/DbEntityAuditDao.java
 
b/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/dao/DbEntityAuditDao.java
index 4f2bebbb4..90c3db3ab 100644
--- 
a/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/dao/DbEntityAuditDao.java
+++ 
b/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/dao/DbEntityAuditDao.java
@@ -17,7 +17,9 @@
  */
 package org.apache.atlas.repository.audit.rdbms.dao;
 
+import org.apache.atlas.model.audit.EntityAuditEventV2;
 import org.apache.atlas.repository.audit.rdbms.entity.DbEntityAudit;
+import org.apache.commons.collections.CollectionUtils;
 
 import javax.persistence.EntityManager;
 import javax.persistence.NoResultException;
@@ -30,15 +32,24 @@ public class DbEntityAuditDao extends 
BaseDao<DbEntityAudit> {
         super(em);
     }
 
-    public List<DbEntityAudit> getByEntityIdActionStartTimeStartIdx(String 
entityId, int action, long eventTimeStart, int eventIdxStart, int maxResults) {
+    public List<DbEntityAudit> getByEntityIdActionStartTimeStartIdx(String 
entityId, String action, long eventTimeStart, int eventIdxStart, int 
maxResults) {
         try {
-            return 
em.createNamedQuery("DbEntityAudit.getByEntityIdActionStartTimeStartIdx", 
DbEntityAudit.class)
-                    .setParameter("entityId", entityId)
-                    .setParameter("action", action)
-                    .setParameter("eventTimeStart", eventTimeStart)
-                    .setParameter("eventIdxStart", eventIdxStart)
-                    .setMaxResults(maxResults)
-                    .getResultList();
+            if (action == null) {
+                return 
em.createNamedQuery("DbEntityAudit.getByEntityIdStartTimeStartIdx", 
DbEntityAudit.class)
+                        .setParameter("entityId", entityId)
+                        .setParameter("eventTimeStart", eventTimeStart)
+                        .setParameter("eventIdxStart", eventIdxStart)
+                        .setMaxResults(maxResults)
+                        .getResultList();
+            } else {
+                return 
em.createNamedQuery("DbEntityAudit.getByEntityIdActionStartTimeStartIdx", 
DbEntityAudit.class)
+                        .setParameter("entityId", entityId)
+                        .setParameter("action", action)
+                        .setParameter("eventTimeStart", eventTimeStart)
+                        .setParameter("eventIdxStart", eventIdxStart)
+                        .setMaxResults(maxResults)
+                        .getResultList();
+            }
         } catch (NoResultException excp) {
             // ignore
         }
@@ -46,18 +57,27 @@ public class DbEntityAuditDao extends 
BaseDao<DbEntityAudit> {
         return Collections.emptyList();
     }
 
-    public List<DbEntityAudit> getByEntityIdAction(String entityId, Integer 
action, int startIdx, int maxResults) {
+    public List<DbEntityAudit> getByEntityIdAction(String entityId, String 
action, List<String> sortByColumn, boolean sortOrder, int startIdx, int 
maxResults) {
         try {
-            if (action == null) {
-                return em.createNamedQuery("DbEntityAudit.getByEntityId", 
DbEntityAudit.class)
+            StringBuilder query = new StringBuilder("SELECT e FROM 
DbEntityAudit e WHERE e.entityId = :entityId");
+
+            if (action != null) {
+                query.append(" and e.action = :action");
+                if (CollectionUtils.isNotEmpty(sortByColumn)) {
+                    query.append(getOrderByQuery(sortByColumn, sortOrder));
+                }
+                return em.createQuery(query.toString(), DbEntityAudit.class)
                         .setParameter("entityId", entityId)
+                        .setParameter("action", action)
                         .setFirstResult(startIdx)
                         .setMaxResults(maxResults)
                         .getResultList();
             } else {
-                return 
em.createNamedQuery("DbEntityAudit.getByEntityIdAction", DbEntityAudit.class)
+                if (CollectionUtils.isNotEmpty(sortByColumn)) {
+                    query.append(getOrderByQuery(sortByColumn, sortOrder));
+                }
+                return em.createQuery(query.toString(), DbEntityAudit.class)
                         .setParameter("entityId", entityId)
-                        .setParameter("action", action)
                         .setFirstResult(startIdx)
                         .setMaxResults(maxResults)
                         .getResultList();
@@ -68,4 +88,50 @@ public class DbEntityAuditDao extends BaseDao<DbEntityAudit> 
{
 
         return Collections.emptyList();
     }
+
+    public String getOrderByQuery(List<String> sortByColumn, boolean 
sortOrderDesc) {
+        StringBuilder orderByQuery = new StringBuilder(" ORDER BY ");
+        for (int i = 0; i < sortByColumn.size(); i++) {
+            orderByQuery.append("e.").append(sortByColumn.get(i));
+            orderByQuery.append(sortOrderDesc ? " DESC" : " ASC");
+            if (i != sortByColumn.size() - 1) {
+                orderByQuery.append(", ");
+            }
+        }
+        return orderByQuery.toString();
+    }
+
+    public List<DbEntityAudit> getLatestAuditsByEntityIdAction(String 
entityId, String action, boolean createEventsAgeoutAllowed) {
+        try {
+            StringBuilder query = new StringBuilder("SELECT e FROM 
DbEntityAudit e WHERE e.entityId = :entityId");
+            if (!createEventsAgeoutAllowed) {
+                String createEvent = 
EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE.name();
+                String importCreateEvent = 
EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_CREATE.name();
+
+                if (action != null && (action.equals(createEvent) || 
action.equals(importCreateEvent))) { //if action is create event and create 
events ageout is not allowed, return empty list
+                    return Collections.emptyList();
+                }
+
+                query.append(" and e.action NOT IN 
('").append(createEvent).append("','").append(importCreateEvent).append("')");
+            }
+
+            if (action != null) {
+                query.append(" and e.action = :action");
+                query.append(" ORDER BY e.eventTime DESC, e.eventIndex DESC");
+                return em.createQuery(query.toString(), DbEntityAudit.class)
+                        .setParameter("entityId", entityId)
+                        .setParameter("action", action)
+                        .getResultList();
+            } else {
+                query.append(" ORDER BY e.eventTime DESC, e.eventIndex DESC");
+                return em.createQuery(query.toString(), DbEntityAudit.class)
+                        .setParameter("entityId", entityId)
+                        .getResultList();
+            }
+        } catch (NoResultException excp) {
+            // ignore
+        }
+
+        return Collections.emptyList();
+    }
 }
diff --git 
a/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/entity/DbEntityAudit.java
 
b/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/entity/DbEntityAudit.java
index e5c9005e5..f9c207bc8 100644
--- 
a/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/entity/DbEntityAudit.java
+++ 
b/graphdb/janusgraph-rdbms/src/main/java/org/apache/atlas/repository/audit/rdbms/entity/DbEntityAudit.java
@@ -30,6 +30,8 @@ import javax.persistence.Table;
 
 import java.util.Objects;
 
+import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2;
+
 /**
  * RDBMS representation of a JanusGraph Column - name/value pair in a 
JanusGraph key
  *
@@ -61,8 +63,8 @@ public class DbEntityAudit implements java.io.Serializable {
     @Column(name = "user_name", nullable = false, length = 64)
     protected String user;
 
-    @Column(name = "operation", nullable = false)
-    protected int action;
+    @Column(name = "operation", nullable = false, length = 64)
+    protected String action;
 
     @Column(name = "details")
     @Lob
@@ -115,14 +117,27 @@ public class DbEntityAudit implements 
java.io.Serializable {
         this.user = user;
     }
 
-    public int getAction() {
+    public String getAction() {
         return action;
     }
 
-    public void setAction(int action) {
+    public void setAction(String action) {
         this.action = action;
     }
 
+    public void setAction(EntityAuditActionV2 actionEnum) {
+        this.action = (actionEnum == null) ? null : actionEnum.name();
+    }
+
+    public EntityAuditActionV2 getActionEnum() {
+        if (action == null) return null;
+        try {
+            return EntityAuditActionV2.valueOf(action);
+        } catch (IllegalArgumentException e) {
+            return null;
+        }
+    }
+
     public String getDetails() {
         return details;
     }
@@ -164,7 +179,7 @@ public class DbEntityAudit implements java.io.Serializable {
                     eventTime == other.eventTime &&
                     eventIndex == other.eventIndex &&
                     Objects.equals(user, other.user) &&
-                    action == other.action &&
+                    Objects.equals(action, other.action) &&
                     Objects.equals(details, other.details) &&
                     Objects.equals(entity, other.entity) &&
                     auditType == other.auditType;
diff --git 
a/graphdb/janusgraph-rdbms/src/main/resources/META-INF/postgres/create_schema.sql
 
b/graphdb/janusgraph-rdbms/src/main/resources/META-INF/postgres/create_schema.sql
index 1327ae6e3..291c0ecb1 100644
--- 
a/graphdb/janusgraph-rdbms/src/main/resources/META-INF/postgres/create_schema.sql
+++ 
b/graphdb/janusgraph-rdbms/src/main/resources/META-INF/postgres/create_schema.sql
@@ -16,7 +16,7 @@
 -- DB objects for Atlas entity audit
 CREATE SEQUENCE IF NOT EXISTS atlas_entity_audit_seq INCREMENT BY 1 CACHE 1000;
 
-CREATE TABLE IF NOT EXISTS atlas_entity_audit(id BIGINT, entity_id VARCHAR(64) 
NOT NULL, event_time BIGINT NOT NULL, event_idx INT NOT NULL, user_name  
VARCHAR(64) NOT NULL, operation INT NOT NULL, details TEXT DEFAULT NULL, entity 
TEXT DEFAULT NULL, audit_type INT NOT NULL, PRIMARY KEY(id));
+CREATE TABLE IF NOT EXISTS atlas_entity_audit(id BIGINT, entity_id VARCHAR(64) 
NOT NULL, event_time BIGINT NOT NULL, event_idx INT NOT NULL, user_name  
VARCHAR(64) NOT NULL, operation VARCHAR(64) NOT NULL, details TEXT DEFAULT 
NULL, entity TEXT DEFAULT NULL, audit_type INT NOT NULL, PRIMARY KEY(id));
 
 CREATE INDEX IF NOT EXISTS atlas_entity_audit_idx_entity_id            ON 
atlas_entity_audit (entity_id);
 CREATE INDEX IF NOT EXISTS atlas_entity_audit_idx_event_time           ON 
atlas_entity_audit (event_time);
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/RdbmsBasedAuditRepository.java
 
b/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/RdbmsBasedAuditRepository.java
index 801123ace..20fddccf8 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/RdbmsBasedAuditRepository.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/RdbmsBasedAuditRepository.java
@@ -18,30 +18,49 @@
 package org.apache.atlas.repository.audit.rdbms;
 
 import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.EntityAuditEvent;
+import org.apache.atlas.RequestContext;
 import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.audit.EntityAuditEventV2;
+import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.audit.AbstractStorageBasedAuditRepository;
 import org.apache.atlas.repository.audit.rdbms.dao.DbEntityAuditDao;
 import org.apache.atlas.repository.audit.rdbms.entity.DbEntityAudit;
+import org.apache.atlas.utils.AtlasPerfMetrics;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Singleton;
 
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.atlas.model.audit.EntityAuditEventV2.SORT_COLUMN_ACTION;
+import static 
org.apache.atlas.model.audit.EntityAuditEventV2.SORT_COLUMN_TIMESTAMP;
+import static org.apache.atlas.model.audit.EntityAuditEventV2.SORT_COLUMN_USER;
+
 @Singleton
 @Component
 @ConditionalOnAtlasProperty(property = "atlas.EntityAuditRepository.impl", 
isDefault = false)
 @Order(0)
 public class RdbmsBasedAuditRepository extends 
AbstractStorageBasedAuditRepository {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RdbmsBasedAuditRepository.class);
+
     @Override
     public void putEventsV1(List<EntityAuditEvent> events) throws 
AtlasException {
         throw new UnsupportedOperationException();
@@ -77,7 +96,7 @@ public class RdbmsBasedAuditRepository extends 
AbstractStorageBasedAuditReposito
         try (RdbmsTransaction trx = new RdbmsTransaction()) {
             DbEntityAuditDao dao = new 
DbEntityAuditDao(trx.getEntityManager());
 
-            List<DbEntityAudit> dbEvents = 
dao.getByEntityIdActionStartTimeStartIdx(entityId, auditAction.ordinal(), 
getTimestampFromKey(startKey), getIndexFromKey(startKey), maxResultCount);
+            List<DbEntityAudit> dbEvents = 
dao.getByEntityIdActionStartTimeStartIdx(entityId, auditAction == null ? null : 
auditAction.name(), getTimestampFromKey(startKey), getIndexFromKey(startKey), 
maxResultCount);
 
             return 
dbEvents.stream().map(RdbmsBasedAuditRepository::fromDbEntityAudit).collect(Collectors.toList());
         } catch (Exception excp) {
@@ -90,7 +109,7 @@ public class RdbmsBasedAuditRepository extends 
AbstractStorageBasedAuditReposito
         try (RdbmsTransaction trx = new RdbmsTransaction()) {
             DbEntityAuditDao dao = new 
DbEntityAuditDao(trx.getEntityManager());
 
-            List<DbEntityAudit> dbEvents = dao.getByEntityIdAction(entityId, 
auditAction == null ? null : auditAction.ordinal(), offset, limit);
+            List<DbEntityAudit> dbEvents = dao.getByEntityIdAction(entityId, 
auditAction == null ? null : auditAction.name(), 
mapSortColumnToDbColumn(sortByColumn), sortOrderDesc, offset, limit);
 
             return 
dbEvents.stream().map(RdbmsBasedAuditRepository::fromDbEntityAudit).collect(Collectors.toList());
         } catch (Exception excp) {
@@ -112,13 +131,127 @@ public class RdbmsBasedAuditRepository extends 
AbstractStorageBasedAuditReposito
     public void stop() throws AtlasException {
     }
 
+    @Override
+    public List<EntityAuditEventV2> deleteEventsV2(String entityId,
+            Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions,
+            short allowedAuditCount,
+            int ttlInDays,
+            boolean createEventsAgeoutAllowed,
+            Constants.AtlasAuditAgingType auditAgingType) throws 
AtlasBaseException {
+        AtlasPerfMetrics.MetricRecorder metric = 
RequestContext.get().startMetricRecord("deleteEventsV2");
+
+        try (RdbmsTransaction trx = new RdbmsTransaction()) {
+            DbEntityAuditDao dao = new 
DbEntityAuditDao(trx.getEntityManager());
+            List<EntityAuditEventV2> deletedEvents = new ArrayList<>();
+
+            boolean allowAgeoutByAuditCount = allowedAuditCount > 0 || 
(auditAgingType == Constants.AtlasAuditAgingType.SWEEP);
+
+            List<DbEntityAudit> eventsEligibleForAgeout = new ArrayList<>();
+            List<DbEntityAudit> eventsToKeep = new ArrayList<>();
+
+            if (CollectionUtils.isEmpty(entityAuditActions)) {
+                List<DbEntityAudit> dbEvents = 
dao.getLatestAuditsByEntityIdAction(entityId, null, createEventsAgeoutAllowed);
+
+                if (!allowAgeoutByAuditCount) {
+                    eventsToKeep.addAll(dbEvents);
+                } else {
+                    int limit = Math.max(0, allowedAuditCount);
+                    for (int i = 0; i < dbEvents.size(); i++) {
+                        if (i < limit) {
+                            eventsToKeep.add(dbEvents.get(i));
+                        } else {
+                            eventsEligibleForAgeout.add(dbEvents.get(i));
+                        }
+                    }
+                }
+            } else {
+                for (EntityAuditEventV2.EntityAuditActionV2 action : 
entityAuditActions) {
+                    String actionName = action == null ? null : action.name();
+                    List<DbEntityAudit> dbEvents = 
dao.getLatestAuditsByEntityIdAction(entityId, actionName, 
createEventsAgeoutAllowed);
+
+                    if (!allowAgeoutByAuditCount) {
+                        eventsToKeep.addAll(dbEvents);
+                    } else {
+                        int limit = Math.max(0, allowedAuditCount);
+                        for (int i = 0; i < dbEvents.size(); i++) {
+                            if (i < limit) {
+                                eventsToKeep.add(dbEvents.get(i));
+                            } else {
+                                eventsEligibleForAgeout.add(dbEvents.get(i));
+                            }
+                        }
+                    }
+                }
+            }
+
+            if (CollectionUtils.isNotEmpty(eventsToKeep)) {
+                //Limit events based on configured audit count by grouping 
events of all action types
+                if (allowAgeoutByAuditCount && (auditAgingType == 
Constants.AtlasAuditAgingType.DEFAULT || 
CollectionUtils.isEmpty(entityAuditActions))) {
+                    LOG.debug("Aging out audit events by audit count for 
entity: {}", entityId);
+
+                    eventsToKeep.sort((a, b) -> {
+                        int cmp = Long.compare(b.getEventTime(), 
a.getEventTime());
+                        if (cmp == 0) {
+                            cmp = Integer.compare(b.getEventIndex(), 
a.getEventIndex());
+                        }
+                        return cmp;
+                    });
+
+                    if (allowedAuditCount < eventsToKeep.size()) {
+                        
eventsEligibleForAgeout.addAll(eventsToKeep.subList(allowedAuditCount, 
eventsToKeep.size()));
+                        eventsToKeep = eventsToKeep.subList(0, 
allowedAuditCount);
+                    }
+                }
+
+                //TTL based aging
+                if (ttlInDays > 0) {
+                    LOG.debug("Aging out audit events by TTL for entity: {}", 
entityId);
+
+                    LocalDateTime now = LocalDateTime.now();
+                    boolean isTTLTestAutomation = 
AtlasConfiguration.ATLAS_AUDIT_AGING_TTL_TEST_AUTOMATION.getBoolean();
+                    long ttlTimestamp = Timestamp.valueOf(isTTLTestAutomation 
? now.minusMinutes(ttlInDays) : now.minusDays(ttlInDays)).getTime();
+
+                    eventsToKeep.forEach(e -> {
+                        if (e.getEventTime() < ttlTimestamp) {
+                            eventsEligibleForAgeout.add(e);
+                        }
+                    });
+                }
+            }
+
+            Map<Long, DbEntityAudit> uniqueEligible = new LinkedHashMap<>();
+            for (DbEntityAudit db : eventsEligibleForAgeout) {
+                if (db != null && db.getId() != null) {
+                    uniqueEligible.put(db.getId(), db);
+                }
+            }
+
+            LOG.debug("Deleting events from audit table: {}", 
uniqueEligible.keySet());
+            for (DbEntityAudit dbEvent : uniqueEligible.values()) {
+                try {
+                    trx.getEntityManager().remove(dbEvent);
+                    deletedEvents.add(fromDbEntityAudit(dbEvent));
+                } catch (Exception e) {
+                    throw new AtlasBaseException("Failed to remove audit 
event: " + dbEvent, e);
+                }
+            }
+
+            trx.commit();
+            return deletedEvents;
+        } catch (Exception e) {
+            throw new AtlasBaseException("Error while deleting audit events", 
e);
+        } finally {
+            RequestContext.get().endMetricRecord(metric);
+        }
+    }
+
     public static DbEntityAudit toDbEntityAudit(EntityAuditEventV2 event) {
         DbEntityAudit ret = new DbEntityAudit();
 
         ret.setEntityId(event.getEntityId());
         ret.setEventTime(event.getTimestamp());
         ret.setUser(event.getUser());
-        ret.setAction(event.getAction().ordinal());
+        ret.setAction(event.getAction());
         ret.setDetails(event.getDetails());
 
         if (event.getType() == null) {
@@ -140,7 +273,10 @@ public class RdbmsBasedAuditRepository extends 
AbstractStorageBasedAuditReposito
         ret.setEntityId(dbEntityAudit.getEntityId());
         ret.setTimestamp(dbEntityAudit.getEventTime());
         ret.setUser(dbEntityAudit.getUser());
-        
ret.setAction(EntityAuditEventV2.EntityAuditActionV2.values()[dbEntityAudit.getAction()]);
+        EntityAuditEventV2.EntityAuditActionV2 actionEnum = 
dbEntityAudit.getActionEnum();
+        if (actionEnum != null) {
+            ret.setAction(actionEnum);
+        }
         ret.setDetails(dbEntityAudit.getDetails());
         
ret.setType(EntityAuditEventV2.EntityAuditType.values()[dbEntityAudit.getAuditType()]);
 
@@ -158,4 +294,19 @@ public class RdbmsBasedAuditRepository extends 
AbstractStorageBasedAuditReposito
             throw new RuntimeException("Failed to get application properties", 
e);
         }
     }
+
+    private List<String> mapSortColumnToDbColumn(String sortByColumn) {
+        if (StringUtils.isEmpty(sortByColumn)) {
+            return null;
+        }
+        switch (sortByColumn) {
+            case SORT_COLUMN_USER:
+                return java.util.Arrays.asList("user", "eventTime", 
"eventIndex");
+            case SORT_COLUMN_ACTION:
+                return java.util.Arrays.asList("action", "eventTime", 
"eventIndex");
+            case SORT_COLUMN_TIMESTAMP:
+            default:
+                return java.util.Arrays.asList("eventTime", "eventIndex");
+        }
+    }
 }

Reply via email to