This is an automated email from the ASF dual-hosted git repository.

chaitalithombare pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new ca4e036de ATLAS-4920: Atlas Auto Purge Feature (#406)
ca4e036de is described below

commit ca4e036de90dd02a0c62b89425f2e71f2f79e615
Author: achandel-01 <[email protected]>
AuthorDate: Mon Sep 22 16:38:07 2025 +0530

    ATLAS-4920: Atlas Auto Purge Feature (#406)
    
    resolved comments
    
    comments resolved
    
    resolved comments
    
    updated UT's for PurgeService
    
    fixed ut's names
    
    fixed ut's names again
    
    updated uts
    
    updated uts,ui,function names
    
    check style changes
    
    Co-authored-by: Abhinav Chandel <[email protected]>
---
 ...-base_model_add_atlas_operation_attributes.json |  19 +
 dashboardv2/public/js/utils/Enums.js               |   3 +-
 .../js/views/audit/AdminAuditTableLayoutView.js    |  10 +-
 dashboardv3/public/js/utils/Enums.js               |   3 +-
 .../js/views/audit/AdminAuditTableLayoutView.js    |  10 +-
 .../repository/graphdb/janus/AtlasJanusGraph.java  |   1 -
 .../apache/atlas/model/audit/AtlasAuditEntry.java  |   1 +
 .../repository/audit/EntityAuditListenerV2.java    |   2 +-
 .../repository/store/graph/AtlasEntityStore.java   |   8 +
 .../repository/store/graph/v1/DeleteHandlerV1.java | 159 +++++-
 .../store/graph/v2/AtlasEntityStoreV2.java         |  47 ++
 .../org/apache/atlas/services/PurgeService.java    | 537 +++++++++++++++++++++
 .../apache/atlas/services/PurgeServiceTest.java    | 220 +++++++++
 .../apache/atlas/web/resources/AdminResource.java  |  70 ++-
 .../atlas/web/resources/AdminResourceTest.java     |   4 +-
 15 files changed, 1080 insertions(+), 14 deletions(-)

diff --git 
a/addons/models/0000-Area0/patches/007-base_model_add_atlas_operation_attributes.json
 
b/addons/models/0000-Area0/patches/007-base_model_add_atlas_operation_attributes.json
new file mode 100644
index 000000000..5bfe974f5
--- /dev/null
+++ 
b/addons/models/0000-Area0/patches/007-base_model_add_atlas_operation_attributes.json
@@ -0,0 +1,19 @@
+{
+  "patches": [
+    {
+      "id": "TYPEDEF_PATCH_0007_001",
+      "description": "Add additional operations in Atlas",
+      "action": "UPDATE_ENUMDEF",
+      "typeName": "atlas_operation",
+      "applyToVersion": "1.1",
+      "updateToVersion": "1.2",
+      "params": null,
+      "elementDefs": [
+        {
+          "ordinal": 10,
+          "value": "AUTO_PURGE"
+        }
+      ]
+    }
+  ]
+}
diff --git a/dashboardv2/public/js/utils/Enums.js 
b/dashboardv2/public/js/utils/Enums.js
index 00bbe103d..fd3760ead 100644
--- a/dashboardv2/public/js/utils/Enums.js
+++ b/dashboardv2/public/js/utils/Enums.js
@@ -69,7 +69,8 @@ define(["require", "backbone"], function(require) {
         BUSINESS_METADATA: "Business Metadata",
         PURGE: "Purge Entities",
         IMPORT: "Import Entities",
-        EXPORT: "Export Entities"
+        EXPORT: "Export Entities",
+        AUTO_PURGE : "Auto Purged Entities"
     }
 
     Enums.entityStateReadOnly = {
diff --git a/dashboardv2/public/js/views/audit/AdminAuditTableLayoutView.js 
b/dashboardv2/public/js/views/audit/AdminAuditTableLayoutView.js
index f7918e82f..cc56e03cf 100644
--- a/dashboardv2/public/js/views/audit/AdminAuditTableLayoutView.js
+++ b/dashboardv2/public/js/views/audit/AdminAuditTableLayoutView.js
@@ -244,7 +244,7 @@ define(['require',
                             el.attr('colspan', '8');
                             if (results) {
                                 var adminValues = null;
-                                if (operation == "PURGE") {
+                                if (operation == "PURGE" || operation == 
"AUTO_PURGE") {
                                     adminText = 
that.displayPurgeAndImportAudits(auditData);
                                 } else if (operation == "EXPORT" || operation 
== "IMPORT") {
                                     adminText = 
that.displayExportAudits(auditData);
@@ -353,7 +353,7 @@ define(['require',
                 var adminValues = '<ul class="col-sm-6">',
                     guids = null,
                     adminTypDetails = Enums.category[obj.operation];
-                if (obj.operation == "PURGE") {
+                if (obj.operation == "PURGE" || obj.operation == "AUTO_PURGE") 
{
                     guids = obj.results ? obj.results.replace('[', 
'').replace(']', '').split(',') : guids;
                 } else {
                     guids = obj.model.get('params') ? 
obj.model.get('params').split(',') : guids;
@@ -428,9 +428,13 @@ define(['require',
             onClickAdminPurgedEntity: function(e) {
                 var that = this;
                 require(['views/audit/AuditTableLayoutView'], 
function(AuditTableLayoutView) {
+                    const titles = {
+                        PURGE: "Purged Entity Details",
+                        AUTO_PURGE: "Auto Purge Entity Details"
+                    };
                     var obj = {
                             guid: $(e.target).text(),
-                            titleText: (e.target.dataset.operation == "PURGE") 
? "Purged Entity Details: " : "Import Details: "
+                            titleText: (titles[e.target.dataset.operation] || 
"Import Details") + ": "
                         },
                         modalData = {
                             title: obj.titleText + obj.guid,
diff --git a/dashboardv3/public/js/utils/Enums.js 
b/dashboardv3/public/js/utils/Enums.js
index 00bbe103d..fd3760ead 100644
--- a/dashboardv3/public/js/utils/Enums.js
+++ b/dashboardv3/public/js/utils/Enums.js
@@ -69,7 +69,8 @@ define(["require", "backbone"], function(require) {
         BUSINESS_METADATA: "Business Metadata",
         PURGE: "Purge Entities",
         IMPORT: "Import Entities",
-        EXPORT: "Export Entities"
+        EXPORT: "Export Entities",
+        AUTO_PURGE : "Auto Purged Entities"
     }
 
     Enums.entityStateReadOnly = {
diff --git a/dashboardv3/public/js/views/audit/AdminAuditTableLayoutView.js 
b/dashboardv3/public/js/views/audit/AdminAuditTableLayoutView.js
index f7918e82f..cc56e03cf 100644
--- a/dashboardv3/public/js/views/audit/AdminAuditTableLayoutView.js
+++ b/dashboardv3/public/js/views/audit/AdminAuditTableLayoutView.js
@@ -244,7 +244,7 @@ define(['require',
                             el.attr('colspan', '8');
                             if (results) {
                                 var adminValues = null;
-                                if (operation == "PURGE") {
+                                if (operation == "PURGE" || operation == 
"AUTO_PURGE") {
                                     adminText = 
that.displayPurgeAndImportAudits(auditData);
                                 } else if (operation == "EXPORT" || operation 
== "IMPORT") {
                                     adminText = 
that.displayExportAudits(auditData);
@@ -353,7 +353,7 @@ define(['require',
                 var adminValues = '<ul class="col-sm-6">',
                     guids = null,
                     adminTypDetails = Enums.category[obj.operation];
-                if (obj.operation == "PURGE") {
+                if (obj.operation == "PURGE" || obj.operation == "AUTO_PURGE") 
{
                     guids = obj.results ? obj.results.replace('[', 
'').replace(']', '').split(',') : guids;
                 } else {
                     guids = obj.model.get('params') ? 
obj.model.get('params').split(',') : guids;
@@ -428,9 +428,13 @@ define(['require',
             onClickAdminPurgedEntity: function(e) {
                 var that = this;
                 require(['views/audit/AuditTableLayoutView'], 
function(AuditTableLayoutView) {
+                    const titles = {
+                        PURGE: "Purged Entity Details",
+                        AUTO_PURGE: "Auto Purge Entity Details"
+                    };
                     var obj = {
                             guid: $(e.target).text(),
-                            titleText: (e.target.dataset.operation == "PURGE") 
? "Purged Entity Details: " : "Import Details: "
+                            titleText: (titles[e.target.dataset.operation] || 
"Import Details") + ": "
                         },
                         modalData = {
                             title: obj.titleText + obj.guid,
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
index e5016eb70..e0d4857a1 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
@@ -446,7 +446,6 @@ public class AtlasJanusGraph implements 
AtlasGraph<AtlasJanusVertex, AtlasJanusE
         return uniqueKeyHandler;
     }
 
-
     public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> 
wrapVertices(Iterable<? extends Vertex> it) {
         return Iterables.transform(it, (Function<Vertex, 
AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>>) input -> 
GraphDbObjectFactory.createVertex(AtlasJanusGraph.this, input));
     }
diff --git 
a/intg/src/main/java/org/apache/atlas/model/audit/AtlasAuditEntry.java 
b/intg/src/main/java/org/apache/atlas/model/audit/AtlasAuditEntry.java
index a103a1730..722c1a52e 100644
--- a/intg/src/main/java/org/apache/atlas/model/audit/AtlasAuditEntry.java
+++ b/intg/src/main/java/org/apache/atlas/model/audit/AtlasAuditEntry.java
@@ -136,6 +136,7 @@ public class AtlasAuditEntry extends AtlasBaseModelObject 
implements Serializabl
 
     public enum AuditOperation {
         PURGE("PURGE"),
+        AUTO_PURGE("AUTO_PURGE"),
         EXPORT("EXPORT"),
         IMPORT("IMPORT"),
         IMPORT_DELETE_REPL("IMPORT_DELETE_REPL"),
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
index f0ac8a9ec..82a02200d 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
@@ -407,7 +407,7 @@ public class EntityAuditListenerV2 implements 
EntityChangeListenerV2 {
     private EntityAuditEventV2 createEvent(EntityAuditEventV2 
entityAuditEventV2, AtlasEntity entity, EntityAuditActionV2 action, String 
details) {
         entityAuditEventV2.setEntityId(entity.getGuid());
         entityAuditEventV2.setTimestamp(System.currentTimeMillis());
-        entityAuditEventV2.setUser(RequestContext.get().getUser());
+        entityAuditEventV2.setUser(RequestContext.get().getUser() != null ? 
RequestContext.get().getUser() : "admin");
         entityAuditEventV2.setAction(action);
         entityAuditEventV2.setDetails(details);
         entityAuditEventV2.setEntity(entity);
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
index 457f86575..6f13175ce 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
@@ -28,6 +28,7 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasEntityHeaders;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.v2.EntityStream;
 import org.apache.atlas.type.AtlasEntityType;
 
@@ -226,6 +227,13 @@ public interface AtlasEntityStore {
      */
     EntityMutationResponse purgeByIds(Set<String> guids) throws 
AtlasBaseException;
 
+    /*
+     * Returns set of auto-purged entity guids
+     */
+    EntityMutationResponse purgeEntitiesInBatch(Set<String> deletedVertices) 
throws AtlasBaseException;
+
+    Set<AtlasVertex> accumulateDeletionCandidates(Set<String> vertices) throws 
AtlasBaseException;
+
     /**
      * Add classification(s)
      */
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
index 24c80c0a1..5d83f2934 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
@@ -63,6 +63,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -77,18 +78,22 @@ import static org.apache.atlas.model.TypeCategory.STRUCT;
 import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
 import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
 import static org.apache.atlas.model.instance.AtlasEntity.Status.PURGED;
+import static 
org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_DATASET;
+import static 
org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_PROCESS;
 import static 
org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO;
 import static 
org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY;
 import static 
org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_STATUS;
 import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
 import static 
org.apache.atlas.repository.Constants.CLASSIFICATION_NAME_DELIMITER;
 import static 
org.apache.atlas.repository.Constants.EDGE_PENDING_TASKS_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY;
 import static 
org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
 import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY;
 import static 
org.apache.atlas.repository.Constants.PROPAGATED_CLASSIFICATION_NAMES_KEY;
 import static 
org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
 import static 
org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY;
 import static 
org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.TYPE_NAME_PROPERTY_KEY;
 import static 
org.apache.atlas.repository.graph.GraphHelper.getAllClassificationEdges;
 import static 
org.apache.atlas.repository.graph.GraphHelper.getAssociatedEntityVertex;
 import static 
org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds;
@@ -127,6 +132,8 @@ public abstract class DeleteHandlerV1 {
     public static final Logger LOG = 
LoggerFactory.getLogger(DeleteHandlerV1.class);
 
     private static final boolean DEFERRED_ACTION_ENABLED = 
AtlasConfiguration.TASKS_USE_ENABLED.getBoolean();
+    private static final String  PROCESS_OUTPUTS_EDGE               = 
"__Process.outputs";
+    private static final String  RELATIONSHIP_ATTRIBUTE_KEY_STRING  = 
"columnLineages";
 
     protected final GraphHelper graphHelper;
 
@@ -156,12 +163,22 @@ public abstract class DeleteHandlerV1 {
      * @throws AtlasBaseException
      */
     public void deleteEntities(Collection<AtlasVertex> instanceVertices) 
throws AtlasBaseException {
+        Set<AtlasVertex> deletionCandidateVertices = 
accumulateDeletionCandidates(instanceVertices);
+        deleteTraitsAndVertices(deletionCandidateVertices);
+    }
+
+    /*
+        accumulate the deletion candidates
+    */
+    public Set<AtlasVertex> 
accumulateDeletionCandidates(Collection<AtlasVertex> instanceVertices) throws 
AtlasBaseException {
         final RequestContext   requestContext            = 
RequestContext.get();
         final Set<AtlasVertex> deletionCandidateVertices = new HashSet<>();
         final boolean          isPurgeRequested          = 
requestContext.isPurgeRequested();
+        final Set<String>      instanceVertexGuids       = new HashSet<>();
 
         for (AtlasVertex instanceVertex : instanceVertices) {
             final String guid = 
AtlasGraphUtilsV2.getIdFromVertex(instanceVertex);
+            instanceVertexGuids.add(guid);
 
             if (skipVertexForDelete(instanceVertex)) {
                 if (LOG.isDebugEnabled()) {
@@ -186,15 +203,155 @@ public abstract class DeleteHandlerV1 {
                 requestContext.recordEntityDelete(entityHeader);
                 deletionCandidateVertices.add(vertexInfo.getVertex());
             }
+
+            AtlasEntityHeader entity = 
entityRetriever.toAtlasEntityHeader(instanceVertex);
+            AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+            if (entityType.getEntityDef().hasSuperType(ATLAS_TYPE_DATASET)) {
+                addUpstreamProcessEntities(instanceVertex, 
deletionCandidateVertices, instanceVertexGuids);
+            }
+
+            if (entityType.getEntityDef().hasSuperType(ATLAS_TYPE_PROCESS)) {
+                getColumnLineageEntities(instanceVertex, 
deletionCandidateVertices);
+            }
         }
+        return deletionCandidateVertices;
+    }
 
-        // Delete traits and vertices.
+    /*
+        actually delete traits and then the vertex along its references
+    */
+    public void deleteTraitsAndVertices(Collection<AtlasVertex> 
deletionCandidateVertices) throws AtlasBaseException {
         for (AtlasVertex deletionCandidateVertex : deletionCandidateVertices) {
             deleteAllClassifications(deletionCandidateVertex);
             deleteTypeVertex(deletionCandidateVertex, 
isInternalType(deletionCandidateVertex));
         }
     }
 
+    public void addUpstreamProcessEntities(AtlasVertex entityVertex, 
Set<AtlasVertex> deletionCandidateVertices, Set<String> instanceVertexGuids) 
throws AtlasBaseException {
+        RequestContext requestContext = RequestContext.get();
+
+        Iterator<AtlasEdge> edgeIterator = 
GraphHelper.getIncomingEdgesByLabel(entityVertex, PROCESS_OUTPUTS_EDGE);
+
+        String entityVertexGuid = entityVertex.getProperty(GUID_PROPERTY_KEY, 
String.class);
+
+        while (edgeIterator.hasNext()) {
+            AtlasEdge edge = edgeIterator.next();
+            AtlasVertex processVertex = edge.getOutVertex();
+
+            String guid = processVertex.getProperty(GUID_PROPERTY_KEY, 
String.class);
+            if (instanceVertexGuids.contains(guid)) {
+                return; // already added
+            }
+
+            boolean isEligible = isEligible(processVertex, entityVertexGuid, 
instanceVertexGuids);
+
+            if (isEligible) {
+                instanceVertexGuids.add(guid);
+
+                getColumnLineageEntities(processVertex, 
deletionCandidateVertices);
+
+                for (GraphHelper.VertexInfo vertexInfo : 
getOwnedVertices(processVertex)) {
+                    AtlasEntityHeader entityHeader = vertexInfo.getEntity();
+
+                    if (requestContext.isPurgeRequested()) {
+                        
entityHeader.setClassifications(entityRetriever.getAllClassifications(vertexInfo.getVertex()));
+                    }
+
+                    requestContext.recordEntityDelete(entityHeader);
+                    deletionCandidateVertices.add(vertexInfo.getVertex());
+                }
+            }
+        }
+    }
+
+    public void getColumnLineageEntities(AtlasVertex process, Set<AtlasVertex> 
deletionCandidateVertices) throws AtlasBaseException {
+        RequestContext requestContext = RequestContext.get();
+
+        AtlasEntityHeader entity = 
entityRetriever.toAtlasEntityHeader(process);
+        AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(entity.getTypeName());
+        Map<String, Map<String, AtlasAttribute>> relationshipAttributes = 
entityType.getRelationshipAttributes();
+        Map<String, AtlasAttribute> columnLineages = 
relationshipAttributes.get(RELATIONSHIP_ATTRIBUTE_KEY_STRING);
+
+        if (columnLineages != null && !columnLineages.isEmpty()) {
+            AtlasAttribute atlasAttribute = 
columnLineages.values().iterator().next();
+            String relationshipEdgeLabel = 
atlasAttribute.getRelationshipEdgeLabel();
+
+            Iterator<AtlasEdge> edgeIterator = 
GraphHelper.getIncomingEdgesByLabel(process, relationshipEdgeLabel);
+
+            int addedCount = 0;
+
+            while (edgeIterator.hasNext()) {
+                AtlasVertex columnLineageVertex = 
edgeIterator.next().getOutVertex();
+                String typeName = 
columnLineageVertex.getProperty(TYPE_NAME_PROPERTY_KEY, String.class);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Column-lineage candidate: type={}, guid={}, 
state={}, viaEdgeLabel={}, process={}",
+                            typeName, getGuid(columnLineageVertex), 
getState(columnLineageVertex), relationshipEdgeLabel, string(process));
+                }
+
+                AtlasEntityHeader columnLineageEntityHeader = 
entityRetriever.toAtlasEntityHeader(columnLineageVertex);
+                requestContext.recordEntityDelete(columnLineageEntityHeader);
+                deletionCandidateVertices.add(columnLineageVertex);
+                addedCount++;
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Collected {} column-lineage vertices via '{}' for 
process {}", addedCount, relationshipEdgeLabel, getGuid(process));
+            }
+        }
+    }
+
+    boolean isEligible(AtlasVertex processVertex, String entityVertexGuid, 
Set<String> instanceVertexGuids) throws AtlasBaseException {
+        RequestContext requestContext = RequestContext.get();
+
+        if (requestContext.isPurgeRequested() && getState(processVertex) == 
ACTIVE) {
+            // skipping the active process entity purging back ward 
compatibility
+            return false;
+        }
+
+        Iterator<AtlasEdge> processEdges = 
GraphHelper.getOutGoingEdgesByLabel(processVertex, PROCESS_OUTPUTS_EDGE);
+
+        long countDeletedOutputs = 0;
+        long instanceDeleted = 0; // for handling the case of deletion
+
+        while (processEdges.hasNext()) {
+            //process vertex can have the deleted outgoing edges apart from 
the current data_set entity
+            AtlasVertex outputEntity = processEdges.next().getInVertex();
+            String outputEntityGuid = 
outputEntity.getProperty(GUID_PROPERTY_KEY, String.class); // output guid
+
+            if (getState(outputEntity) == ACTIVE && 
!entityVertexGuid.equals(outputEntityGuid)) {
+                return false;
+            }
+            countDeletedOutputs++;
+
+            if (requestContext.isPurgeRequested() && 
instanceVertexGuids.contains(outputEntityGuid)) {
+                instanceDeleted++; //for checking that if all outputs are in 
the instanceVertexGuids during purge
+            }
+        }
+
+        if (requestContext.isPurgeRequested() && countDeletedOutputs > 1) { // 
ensuring process purging along only left deleted output
+            // but can skip process entity if the all datasets are to be 
purged in a single scheduled job
+            return countDeletedOutputs == instanceDeleted;
+        }
+
+        return true;
+    }
+
+    public static boolean isSoftDeletableProcess(AtlasVertex processVertex) {
+        Iterator<AtlasEdge> processEdges = 
GraphHelper.getOutGoingEdgesByLabel(processVertex, PROCESS_OUTPUTS_EDGE);
+
+        while (processEdges.hasNext()) {
+            AtlasVertex output = processEdges.next().getInVertex();
+
+            if (getState(output) == ACTIVE) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
     /**
      * Delete the specified relationship edge.
      *
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index cdfa8b542..3dd45f98f 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -577,6 +577,53 @@ public class AtlasEntityStoreV2 implements 
AtlasEntityStore {
         return ret;
     }
 
+    @Override
+    @GraphTransaction
+    public EntityMutationResponse purgeEntitiesInBatch(Set<String> 
purgeCandidates) throws AtlasBaseException {
+        LOG.info("==> purgeEntitiesInBatch()");
+
+        Collection<AtlasVertex> purgeVertices = new ArrayList<>();
+        EntityMutationResponse response      = new EntityMutationResponse();
+
+        RequestContext requestContext = RequestContext.get();
+        requestContext.setDeleteType(DeleteType.HARD); // hard deleter
+        requestContext.setPurgeRequested(true);
+
+        for (String guid : purgeCandidates) {
+            AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(graph, guid);
+            if (vertex != null) {
+                AtlasEntityHeader entityHeader = 
entityRetriever.toAtlasEntityHeader(vertex);
+                purgeVertices.add(vertex);
+                response.addEntity(PURGE, entityHeader);
+            }
+        }
+
+        deleteDelegate.getHandler().deleteTraitsAndVertices(purgeVertices);
+
+        entityChangeNotifier.onEntitiesMutated(response, false);
+
+        for (AtlasEntityHeader entity : response.getPurgedEntities()) {
+            LOG.info("Auto purged entity with guid {}", entity.getGuid());
+        }
+
+        LOG.info("<== purgeEntitiesInBatch()");
+
+        return response;
+    }
+
+    @Override
+    public Set<AtlasVertex> accumulateDeletionCandidates(Set<String> guids) 
throws AtlasBaseException {
+        LOG.info("==> accumulateDeletionCandidates() !");
+        Set<AtlasVertex> vertices = new HashSet<>();
+
+        for (String guid : guids) {
+            AtlasVertex vertex = entityRetriever.getEntityVertex(guid);
+            vertices.add(vertex);
+        }
+
+        return 
deleteDelegate.getHandler().accumulateDeletionCandidates(vertices);
+    }
+
     @Override
     @GraphTransaction
     public void addClassifications(final String guid, final 
List<AtlasClassification> classifications) throws AtlasBaseException {
diff --git 
a/repository/src/main/java/org/apache/atlas/services/PurgeService.java 
b/repository/src/main/java/org/apache/atlas/services/PurgeService.java
new file mode 100644
index 000000000..8f5de1f72
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/services/PurgeService.java
@@ -0,0 +1,537 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.services;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.DeleteType;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.annotation.AtlasService;
+import org.apache.atlas.annotation.Timed;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.pc.WorkItemBuilder;
+import org.apache.atlas.pc.WorkItemConsumer;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.service.Service;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.utils.AtlasPerfTracer;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
+
+import static 
org.apache.atlas.model.instance.EntityMutations.EntityOperation.PURGE;
+import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY;
+import static 
org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.VERTEX_INDEX;
+
+@AtlasService
+@Order(9)
+@Component
+public class PurgeService implements Service {
+    private static final Logger LOG       = 
LoggerFactory.getLogger(PurgeService.class);
+    private static final Logger PERF_LOG  = 
AtlasPerfTracer.getPerfLogger("service.Purge");
+    private final AtlasGraph atlasGraph;
+    private static Configuration atlasProperties;
+    private final AtlasEntityStore entityStore;
+    private final AtlasTypeRegistry typeRegistry;
+
+    private static final String  ENABLE_PROCESS_SOFT_DELETION         = 
"atlas.enable.process.soft.delete";
+    private static final boolean ENABLE_PROCESS_SOFT_DELETION_DEFAULT = false;
+    private static final String  PURGE_ENABLED_SERVICE_TYPES          = 
"atlas.purge.enabled.services";
+    private static final String  SOFT_DELETE_ENABLED_PROCESS_TYPES    = 
"atlas.soft.delete.enabled.process.types";
+    private static final String  PURGE_BATCH_SIZE                     = 
"atlas.purge.batch.size";
+    private static final int     DEFAULT_PURGE_BATCH_SIZE             = 1000; 
// fetching limit at a time
+    private static final String  PURGE_WORKER_BATCH_SIZE              = 
"atlas.purge.worker.batch.size";
+    private static final int     DEFAULT_PURGE_WORKER_BATCH_SIZE      = 100;
+    private static final String  CLEANUP_WORKER_BATCH_SIZE            = 
"atlas.cleanup.worker.batch.size";
+    private static final int     DEFAULT_CLEANUP_WORKER_BATCH_SIZE    = 100;
+    private static final String  PURGE_RETENTION_PERIOD               = 
"atlas.purge.deleted.entity.retention.days";
+    private static final int     PURGE_RETENTION_PERIOD_DEFAULT       = 30; // 
days
+    private static final String  PURGE_WORKERS_COUNT                  = 
"atlas.purge.workers.count";
+    private static final int     DEFAULT_PURGE_WORKERS_COUNT          = 2;
+    private static final String  CLEANUP_WORKERS_COUNT                = 
"atlas.cleanup.workers.count";
+    private static final int     DEFAULT_CLEANUP_WORKERS_COUNT        = 2;
+    private static final String  PROCESS_ENTITY_CLEANER_THREAD_NAME   = 
"Process-Entity-Cleaner";
+    private final        String  indexSearchPrefix                    = 
AtlasGraphUtilsV2.getIndexSearchPrefix();
+    private static final int     DEFAULT_CLEANUP_BATCH_SIZE           = 1000;
+    private static final String  CLEANUP_WORKERS_NAME                 = 
"Process-Cleanup-Worker";
+    private static final String  PURGE_WORKERS_NAME                   = 
"Entity-Purge-Worker";
+    private static final String  DELETED                              = 
"DELETED";
+    private static final String  ACTIVE                               = 
"ACTIVE";
+    private static final String  AND_STR                              = " AND 
";
+
+    static {
+        try {
+            atlasProperties = ApplicationProperties.get();
+        } catch (Exception e) {
+            LOG.info("Failed to load application properties", e);
+        }
+    }
+
+    @Inject
+    public PurgeService(AtlasGraph atlasgraph, AtlasEntityStore entityStore, 
AtlasTypeRegistry typeRegistry) {
+        this.atlasGraph   = atlasgraph;
+        this.entityStore  = entityStore;
+        this.typeRegistry = typeRegistry;
+    }
+
+    @Override
+    public void start() throws AtlasException {
+        if (!getSoftDeletionFlag()) {
+            LOG.info("==> cleanup not enabled");
+            return;
+        }
+
+        LOG.info("==> PurgeService.start()");
+
+        launchCleanUp();
+
+        LOG.info("<== Launched the clean up thread");
+    }
+
+    @Override
+    public void stop() throws AtlasException {
+        LOG.info("==> stopping the purge service");
+    }
+
+    public void launchCleanUp() {
+        LOG.info("==> launching the new thread");
+
+        Thread thread = new Thread(
+                () -> {
+                    long startTime = System.currentTimeMillis();
+                    LOG.info("==> {} started", 
PROCESS_ENTITY_CLEANER_THREAD_NAME);
+                    softDeleteProcessEntities();
+                    LOG.info("==> exiting thread {}", 
PROCESS_ENTITY_CLEANER_THREAD_NAME);
+                    long endTime = System.currentTimeMillis();
+                    LOG.info("==> completed cleanup {} seconds !", (endTime - 
startTime) / 1000);
+                });
+
+        thread.setName(PROCESS_ENTITY_CLEANER_THREAD_NAME);
+        thread.start();
+        LOG.info("==> launched the thread for the clean up");
+    }
+
+    @SuppressWarnings("unchecked")
+    @Timed
+    public EntityMutationResponse purgeEntities() {
+        LOG.info("==> PurgeService.purgeEntities()");
+        // index query of specific batch size
+        AtlasPerfTracer perf = null;
+        EntityMutationResponse entityMutationResponse = new 
EntityMutationResponse();
+        RequestContext requestContext = RequestContext.get();
+        requestContext.setDeleteType(DeleteType.HARD); // hard delete
+        requestContext.setPurgeRequested(true);
+
+        try {
+            if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, 
"PurgeService.purgeEntities");
+            }
+
+            Set<String> allEligibleTypes = getEntityTypes();
+
+            try {
+                //bring n number of entities like 1000 at point of type 
Processes
+                WorkItemsQualifier wiq = createQualifier(typeRegistry, 
entityStore, atlasGraph, getPurgeWorkerBatchSize(), getPurgeWorkersCount(), 
true);
+
+                String indexQuery = getBulkQueryString(allEligibleTypes, 
getPurgeRetentionPeriod());
+                Iterator<Result> itr = atlasGraph.indexQuery(VERTEX_INDEX, 
indexQuery).vertices(0, getPurgeBatchSize());
+                LOG.info("==>  fetched Deleted entities");
+
+                if (!itr.hasNext()) {
+                    LOG.info("==> no Purge Entities found");
+                    return entityMutationResponse;
+                }
+
+                Set<String> producedDeletionCandidates = new HashSet<>(); // 
look up
+
+                while (itr.hasNext()) {
+                    AtlasVertex vertex = itr.next().getVertex();
+
+                    if (vertex == null) {
+                        continue;
+                    }
+
+                    String guid = vertex.getProperty(GUID_PROPERTY_KEY, 
String.class);
+
+                    if (!producedDeletionCandidates.contains(guid)) {
+                        Set<String> instanceVertex = new HashSet<>();
+                        instanceVertex.add(guid);
+
+                        Set<AtlasVertex> deletionCandidates = 
entityStore.accumulateDeletionCandidates(instanceVertex);
+
+                        for (AtlasVertex deletionCandidate : 
deletionCandidates) {
+                            String deletionCandidateGuid = 
deletionCandidate.getProperty(GUID_PROPERTY_KEY, String.class);
+                            if 
(!producedDeletionCandidates.contains(deletionCandidateGuid)) {
+                                
producedDeletionCandidates.add(deletionCandidateGuid);
+                                wiq.checkProduce(deletionCandidate);
+                            }
+                        }
+                    }
+                }
+
+                wiq.shutdown();
+
+                // collecting all the results
+                Queue results = wiq.getResults();
+
+                LOG.info("==> Purged {} !", results.size());
+
+                while (!results.isEmpty()) {
+                    AtlasEntityHeader entityHeader = (AtlasEntityHeader) 
results.poll();
+                    if (entityHeader == null) {
+                        continue;
+                    }
+                    entityMutationResponse.addEntity(PURGE, entityHeader);
+                }
+            } catch (Exception ex) {
+                LOG.error("purge: failed!", ex);
+            } finally {
+                LOG.info("purge: Done!");
+            }
+        } finally {
+            AtlasPerfTracer.log(perf);
+        }
+
+        LOG.info("<== PurgeService.purgeEntities()");
+
+        return entityMutationResponse;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Timed
+    public void softDeleteProcessEntities() {
+        LOG.info("==> softDeleteProcessEntities()");
+
+        AtlasPerfTracer perf = null;
+        try {
+            if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, 
"PurgeService.softDeleteProcessEntities");
+            }
+
+            Set<String> validProcessTypes = getProcessTypes();
+            try {
+                //bring n number of entities like 1000 at point of type 
Processes
+                WorkItemsQualifier wiq = createQualifier(typeRegistry, 
entityStore, atlasGraph, getCleanupWorkerBatchSize(), getCleanUpWorkersCount(), 
false);
+                int offset = 0;
+                boolean moreResults = true;
+
+                while (moreResults) {
+                    String indexQuery = getBulkQueryString(validProcessTypes, 
0);
+                    Iterator<Result> itr = atlasGraph.indexQuery(VERTEX_INDEX, 
indexQuery).vertices(offset, DEFAULT_CLEANUP_BATCH_SIZE);
+                    LOG.info("==>  fetched entities");
+
+                    if (!itr.hasNext()) {
+                        moreResults = false;
+                    }
+
+                    while (itr.hasNext()) {
+                        AtlasVertex vertex = itr.next().getVertex();
+                        if (vertex != null) {
+                            wiq.checkProduce(vertex);
+                        }
+                    }
+
+                    offset += DEFAULT_CLEANUP_BATCH_SIZE;
+                    LOG.info("==> offset {}", offset);
+                }
+
+                wiq.shutdown();
+            } catch (Exception ex) {
+                LOG.error("cleanUp: failed!", ex);
+            } finally {
+                LOG.info("cleanUp: Done!");
+            }
+        } finally {
+            AtlasPerfTracer.log(perf);
+        }
+
+        LOG.info("<== softDeleteProcessEntities()");
+    }
+
+    static class EntityQualifier extends WorkItemConsumer<AtlasVertex> {
+        private final Set<String> batch = new HashSet<>();
+        private final AtlasEntityStore entityStore;
+        private final AtlasTypeRegistry typeRegistry;
+        private final AtlasGraph atlasGraph;
+        private final boolean isPurgeEnabled;
+        private int batchesProcessed;
+        private int batchSize;
+
+        public EntityQualifier(BlockingQueue<AtlasVertex> queue, 
AtlasTypeRegistry typeRegistry, AtlasEntityStore entityStore, AtlasGraph 
atlasGraph, boolean isPurgeEnabled, int batchSize) {
+            super(queue);
+            this.typeRegistry     = typeRegistry;
+            this.entityStore      = entityStore;
+            this.atlasGraph       = atlasGraph;
+            this.isPurgeEnabled   = isPurgeEnabled;
+            this.batchesProcessed = 0;
+            this.batchSize        = batchSize;
+
+            if (isPurgeEnabled) {
+                LOG.info("==> consumers are purge enabled , batch size is {}", 
batchSize);
+            } else {
+                LOG.info("==> consumers are soft delete enabled , batch size 
is {}", batchSize);
+            }
+        }
+
+        @Override
+        protected void processItem(AtlasVertex vertex) {
+            String guid =  vertex.getProperty(GUID_PROPERTY_KEY, String.class);
+            LOG.info("==> processing the entity {}", guid);
+
+            try {
+                if (!isPurgeEnabled && !isEligible(vertex)) {
+                    return;
+                }
+                batch.add(guid);
+                commit();
+            } catch (Exception ex) {
+                LOG.info("{}", ex.getMessage());
+            }
+        }
+
+        @Override
+        protected void doCommit() {
+            if (batch.size() == batchSize) {
+                attemptCommit();
+            }
+        }
+
+        @Override
+        protected void commitDirty() {
+            if (!batch.isEmpty()) {
+                attemptCommit();
+            }
+
+            super.commitDirty();
+        }
+
+        protected void attemptCommit() {
+            EntityMutationResponse res;
+            List<AtlasEntityHeader> results = Collections.emptyList();
+
+            try {
+                if (isPurgeEnabled) {
+                    // purging not by directly
+                    res = entityStore.purgeEntitiesInBatch(batch);
+                } else {
+                    List<String> batchList = new ArrayList<>(batch);
+                    res = entityStore.deleteByIds(batchList);
+                }
+
+                results = isPurgeEnabled ? res.getPurgedEntities() : 
res.getDeletedEntities();
+
+                if (CollectionUtils.isEmpty(results)) {
+                    return;
+                }
+
+                for (AtlasEntityHeader entityHeader : results) {
+                    addResult(entityHeader); // adding results
+                }
+            } catch (Exception e) {
+                LOG.info("==> Exception: {}", e.getMessage());
+            } finally {
+                batchesProcessed++;
+                batch.clear();
+                LOG.info("==> Processed {} batch number with total {} entities 
purged!", batchesProcessed, results.size());
+            }
+        }
+    }
+
+    static class EntityQualifierBuilder implements 
WorkItemBuilder<EntityQualifier, AtlasVertex> {
+        private final AtlasTypeRegistry typeRegistry;
+        private final AtlasEntityStore entityStore;
+        private final AtlasGraph atlasGraph;
+        private final boolean isPurgeEnabled;
+        private int batchSize;
+
+        public EntityQualifierBuilder(AtlasTypeRegistry typeRegistry, 
AtlasEntityStore entityStore, AtlasGraph atlasGraph, boolean isPurgeEnabled, 
int batchSize) {
+            this.typeRegistry   = typeRegistry;
+            this.entityStore    = entityStore;
+            this.atlasGraph     = atlasGraph;
+            this.isPurgeEnabled = isPurgeEnabled;
+            this.batchSize      = batchSize;
+        }
+
+        @Override
+        public EntityQualifier build(BlockingQueue<AtlasVertex> queue) {
+            return new EntityQualifier(queue, typeRegistry, entityStore, 
atlasGraph, isPurgeEnabled, batchSize);
+        }
+    }
+
+    static class WorkItemsQualifier extends WorkItemManager<AtlasVertex, 
EntityQualifier> {
+        public WorkItemsQualifier(WorkItemBuilder builder, int batchSize, int 
numWorkers, boolean isPurgeEnabled) {
+            super(builder, isPurgeEnabled ? PURGE_WORKERS_NAME : 
CLEANUP_WORKERS_NAME, batchSize, numWorkers, true);
+        }
+
+        @Override
+        public void shutdown() throws InterruptedException {
+            LOG.info("==> Shutting down manager!");
+            super.shutdown();
+        }
+    }
+
+    public WorkItemsQualifier createQualifier(AtlasTypeRegistry typeRegistry, 
AtlasEntityStore entityStore, AtlasGraph atlasGraph, int batchSize, int 
numWorkers, boolean isPurgeEnabled) {
+        EntityQualifierBuilder eqb = new EntityQualifierBuilder(typeRegistry, 
entityStore, atlasGraph, isPurgeEnabled, batchSize);
+        LOG.info("==> creating the purge entity producer");
+        return new WorkItemsQualifier(eqb, batchSize, numWorkers, 
isPurgeEnabled);
+    }
+
+    public static boolean isEligible(AtlasVertex vertex) {
+        return DeleteHandlerV1.isSoftDeletableProcess(vertex);
+    }
+
+    private String getBulkQueryString(Set<String> typeNames, int 
retentionPeriod) {
+        String joinedTypes = typeNames.stream()
+                .map(t -> "\"" + t + "\"")
+                .collect(Collectors.joining(" OR ", "(", ")"));
+
+        String indexQuery = getString(retentionPeriod, joinedTypes);
+
+        LOG.info("bulk index query : {}", indexQuery);
+        return indexQuery;
+    }
+
+    private long timeThresholdMillis(int retentionPeriod) {
+        long currentTimeMillis = System.currentTimeMillis();
+        long retentionPeriodMillis = retentionPeriod * 24L * 60 * 60 * 1000;  
// Convert days to ms
+        return currentTimeMillis - retentionPeriodMillis;
+    }
+
+    private String getString(int retentionDays, String joinedTypes) {
+        String baseQuery = indexSearchPrefix + "\"" + ENTITY_TYPE_PROPERTY_KEY 
+ "\": " + joinedTypes + AND_STR +
+                indexSearchPrefix + "\"" + STATE_PROPERTY_KEY + "\": (%s)";
+
+        String indexQuery = (retentionDays > 0)
+                ? String.format(baseQuery + AND_STR + indexSearchPrefix + "\"" 
+ MODIFICATION_TIMESTAMP_PROPERTY_KEY + "\": [* TO %s]", DELETED, 
timeThresholdMillis(retentionDays))
+                : String.format(baseQuery, ACTIVE);
+
+        return indexQuery;
+    }
+
+    public boolean getSoftDeletionFlag() {
+        if (atlasProperties != null) {
+            return atlasProperties.getBoolean(ENABLE_PROCESS_SOFT_DELETION, 
ENABLE_PROCESS_SOFT_DELETION_DEFAULT);
+        }
+        return false;
+    }
+
+    private int getPurgeRetentionPeriod() {
+        int retentionPeriod = PURGE_RETENTION_PERIOD_DEFAULT;
+
+        if (atlasProperties != null) {
+            retentionPeriod = atlasProperties.getInt(PURGE_RETENTION_PERIOD, 
PURGE_RETENTION_PERIOD_DEFAULT);
+        }
+
+        return Math.max(PURGE_RETENTION_PERIOD_DEFAULT, retentionPeriod); // 
for enforcing the minimum retention period  of 30 days
+    }
+
+    private Set<String> getProcessTypes() {
+        Set<String> processTypes = new HashSet<>();
+
+        if (atlasProperties != null) {
+            String[] eligibleTypes = 
atlasProperties.getStringArray(SOFT_DELETE_ENABLED_PROCESS_TYPES); // e.g. 
hive, spark
+            for (String type : eligibleTypes) {
+                if (typeRegistry.isRegisteredType(type)) {
+                    processTypes.add(type);
+                }
+            }
+        }
+
+        return processTypes;
+    }
+
+    public Set<String> getEntityTypes() {
+        Set<String> entityTypes = new HashSet<>();
+
+        if (atlasProperties != null) {
+            String[] eligibleServiceTypes = 
atlasProperties.getStringArray(PURGE_ENABLED_SERVICE_TYPES); // e.g. hive, spark
+            Set<String> serviceTypes = 
Arrays.stream(eligibleServiceTypes).collect(Collectors.toSet());
+
+            for (AtlasEntityDef entityDef : typeRegistry.getAllEntityDefs()) {
+                if (serviceTypes.contains(entityDef.getServiceType())) {
+                    entityTypes.add(entityDef.getName());
+                }
+            }
+        }
+
+        return entityTypes;
+    }
+
+    private int getPurgeBatchSize() {
+        if (atlasProperties != null) {
+            return atlasProperties.getInt(PURGE_BATCH_SIZE, 
DEFAULT_PURGE_BATCH_SIZE);
+        }
+        return DEFAULT_PURGE_BATCH_SIZE;
+    }
+
+    private int getPurgeWorkersCount() {
+        if (atlasProperties != null) {
+            return atlasProperties.getInt(PURGE_WORKERS_COUNT, 
DEFAULT_PURGE_WORKERS_COUNT);
+        }
+        return DEFAULT_PURGE_WORKERS_COUNT;
+    }
+
+    private int getCleanUpWorkersCount() {
+        if (atlasProperties != null) {
+            return atlasProperties.getInt(CLEANUP_WORKERS_COUNT, 
DEFAULT_CLEANUP_WORKERS_COUNT);
+        }
+        return DEFAULT_CLEANUP_WORKERS_COUNT;
+    }
+
+    private int getPurgeWorkerBatchSize() {
+        if (atlasProperties != null) {
+            return atlasProperties.getInt(PURGE_WORKER_BATCH_SIZE, 
DEFAULT_PURGE_WORKER_BATCH_SIZE);
+        }
+        return DEFAULT_PURGE_WORKER_BATCH_SIZE;
+    }
+
+    private int getCleanupWorkerBatchSize() {
+        if (atlasProperties != null) {
+            return atlasProperties.getInt(CLEANUP_WORKER_BATCH_SIZE, 
DEFAULT_CLEANUP_WORKER_BATCH_SIZE);
+        }
+        return DEFAULT_CLEANUP_WORKER_BATCH_SIZE;
+    }
+}
diff --git 
a/repository/src/test/java/org/apache/atlas/services/PurgeServiceTest.java 
b/repository/src/test/java/org/apache/atlas/services/PurgeServiceTest.java
new file mode 100644
index 000000000..0f4e0482c
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/services/PurgeServiceTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.services;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.DeleteType;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.AtlasTestBase;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasElement;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.utils.TestLoadModelUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+@Guice(modules = org.apache.atlas.TestModules.TestOnlyModule.class)
+public class PurgeServiceTest extends AtlasTestBase {
+    @Inject
+    private AtlasTypeDefStore typeDefStore;
+
+    @Inject
+    private AtlasTypeRegistry typeRegistry;
+
+    @Inject
+    private AtlasEntityStoreV2 entityStore;
+
+    @Inject
+    private AtlasGraph atlasGraph;
+
+    @BeforeClass
+    public void setup() throws Exception {
+        RequestContext.clear();
+        super.initialize();
+        TestLoadModelUtils.loadBaseModel(typeDefStore, typeRegistry);
+        TestLoadModelUtils.loadHiveModel(typeDefStore, typeRegistry);
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException ignored) { }
+    }
+
+    @Test
+    public void testPurgeEntities() throws Exception {
+        // Approach (pre-commit check on async flow):
+        // - Spy store's IAtlasEntityChangeNotifier and capture 
onEntitiesMutated(response,false) to assert
+        //   the purged GUID before commit/locking paths.
+        // Create DB and table
+        AtlasEntity db = newHiveDb(null);
+        String dbGuid = persistAndGetGuid(db);
+        AtlasEntity tbl = newHiveTable(db, null);
+        String tblGuid = persistAndGetGuid(tbl);
+
+        // Soft-delete table
+        EntityMutationResponse del = 
entityStore.deleteByIds(Collections.singletonList(tblGuid));
+        assertNotNull(del);
+
+        // Backdate timestamp beyond default 30d retention and reindex to 
reflect in Solr
+        backdateModificationTimestamp(tblGuid, 31);
+        reindexVertices(tblGuid);
+        pauseForIndexCreation();
+
+        // Enable hive purge and single worker for determinism
+        
ApplicationProperties.get().setProperty("atlas.purge.enabled.services", "hive");
+        ApplicationProperties.get().setProperty("atlas.purge.workers.count", 
"1");
+        
ApplicationProperties.get().setProperty("atlas.purge.worker.batch.size", "1");
+
+        // Clear context to avoid skipping due to prior delete tracking
+        RequestContext.clear();
+
+        // Inject notifier spy to capture pre-commit signal (pre-commit 
verification point)
+        Object originalNotifier = injectNotifierSpy(entityStore);
+
+        new PurgeService(atlasGraph, entityStore, 
typeRegistry).purgeEntities();
+
+        // Verify notifier call and capture response before transaction commit 
(async-safe via timeout)
+        IAtlasEntityChangeNotifier spy = (IAtlasEntityChangeNotifier) 
getNotifier(entityStore);
+        ArgumentCaptor<EntityMutationResponse> cap = 
ArgumentCaptor.forClass(EntityMutationResponse.class);
+        Mockito.verify(spy, 
Mockito.timeout(5000)).onEntitiesMutated(cap.capture(), Mockito.eq(false));
+
+        EntityMutationResponse notified = cap.getValue();
+        assertNotNull(notified);
+        List<AtlasEntityHeader> purged = notified.getPurgedEntities();
+        assertNotNull(purged);
+
+        assertTrue(purged.stream().anyMatch(h -> tblGuid.equals(h.getGuid())));
+
+        // Restore original notifier to leave the store in a clean state
+        restoreNotifier(entityStore, originalNotifier);
+
+        // Flag assertions
+        assertTrue(RequestContext.get().isPurgeRequested());
+        assertEquals(RequestContext.get().getDeleteType(), DeleteType.HARD);
+    }
+
+    private AtlasEntity newHiveDb(String nameOpt) {
+        String name = nameOpt != null ? nameOpt : 
RandomStringUtils.randomAlphanumeric(10);
+        AtlasEntity db = new AtlasEntity("hive_db");
+        db.setAttribute("name", name);
+        db.setAttribute("qualifiedName", name);
+        db.setAttribute("clusterName", "cl1");
+        db.setAttribute("location", "/tmp");
+        db.setAttribute("description", "test db");
+        return db;
+    }
+
+    private AtlasEntity newHiveTable(AtlasEntity db, String nameOpt) {
+        String name = nameOpt != null ? nameOpt : 
RandomStringUtils.randomAlphanumeric(10);
+        AtlasEntity tbl = new AtlasEntity("hive_table");
+        tbl.setAttribute("name", name);
+        tbl.setAttribute("qualifiedName", name);
+        tbl.setAttribute("description", "random table");
+        tbl.setAttribute("type", "type");
+        tbl.setAttribute("tableType", "MANAGED");
+        tbl.setAttribute("db", AtlasTypeUtil.getAtlasObjectId(db));
+        return tbl;
+    }
+
+    private String persistAndGetGuid(AtlasEntity entity) throws 
AtlasBaseException {
+        EntityMutationResponse resp = entityStore.createOrUpdate(new 
AtlasEntityStream(new AtlasEntityWithExtInfo(entity)), false);
+        String typeName = entity.getTypeName();
+        AtlasEntityHeader hdr = resp.getFirstCreatedEntityByTypeName(typeName);
+        String guid = hdr != null ? hdr.getGuid() : null;
+        return guid;
+    }
+
+    private void backdateModificationTimestamp(String guid, int days) {
+        AtlasVertex v = AtlasGraphUtilsV2.findByGuid(atlasGraph, guid);
+        if (v != null) {
+            long delta = days * 24L * 60 * 60 * 1000;
+            long ts = System.currentTimeMillis() - delta;
+            AtlasGraphUtilsV2.setProperty(v, 
Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, ts);
+        }
+    }
+
+    private void reindexVertices(String... guids) {
+        List<AtlasElement> elements = new ArrayList<>();
+        for (String g : guids) {
+            if (g == null) {
+                continue;
+            }
+            AtlasVertex v = AtlasGraphUtilsV2.findByGuid(atlasGraph, g);
+            if (v != null) {
+                elements.add(v);
+            }
+        }
+        if (!elements.isEmpty()) {
+            try {
+                
atlasGraph.getManagementSystem().reindex(Constants.VERTEX_INDEX, elements);
+                
atlasGraph.getManagementSystem().reindex(Constants.FULLTEXT_INDEX, elements);
+            } catch (Exception ignored) { }
+        }
+    }
+
+    // Test helper: swap store's private notifier field with a Mockito spy so 
we can
+    // capture and assert the pre-commit mutation response invoked by the 
store.
+    private Object injectNotifierSpy(AtlasEntityStoreV2 storeV2) throws 
Exception {
+        Field f = 
AtlasEntityStoreV2.class.getDeclaredField("entityChangeNotifier");
+        f.setAccessible(true);
+        Object original = f.get(storeV2);
+        Object spy = Mockito.spy(original);
+        f.set(storeV2, spy);
+        return original;
+    }
+
+    // Test helper: fetch the (spied) notifier instance currently installed on 
the store.
+    private Object getNotifier(AtlasEntityStoreV2 storeV2) throws Exception {
+        Field f = 
AtlasEntityStoreV2.class.getDeclaredField("entityChangeNotifier");
+        f.setAccessible(true);
+        return f.get(storeV2);
+    }
+
+    // Test helper: restore the original notifier instance after verification.
+    private void restoreNotifier(AtlasEntityStoreV2 storeV2, Object original) 
throws Exception {
+        Field f = 
AtlasEntityStoreV2.class.getDeclaredField("entityChangeNotifier");
+        f.setAccessible(true);
+        f.set(storeV2, original);
+    }
+}
diff --git 
a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java 
b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index 06ff05339..7064cc0da 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -68,6 +68,7 @@ import org.apache.atlas.repository.impexp.ZipSink;
 import org.apache.atlas.repository.patches.AtlasPatchManager;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.services.MetricsService;
+import org.apache.atlas.services.PurgeService;
 import org.apache.atlas.tasks.TaskManagement;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
@@ -121,6 +122,7 @@ import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -162,6 +164,10 @@ public class AdminResource {
     private static final List<String>  TIMEZONE_LIST                    = 
Arrays.asList(TimeZone.getAvailableIDs());
     private static final String        METRICS_PERSIST_INTERVAL         = 
"atlas.metrics.persist.schedule";
     private static final String        METRICS_PERSIST_INTERVAL_DEFAULT = "0 0 
0/1 * * *";     // 1 hour interval
+    private static final String        PURGE_CRON_EXPRESSION            = 
"atlas.purge.cron.expression";
+    private static final String        PURGE_CRON_EXPRESSION_DEFAULT    = "* * 
* 30 2 ?"; // disabled by default, user controlled scheduling only
+    private static final String        ACTIVE                           = 
"ACTIVE";
+    private static final String        PURGE_THREAD_NAME                = 
"Scheduled-Purge-Thread";
     private static final Configuration atlasProperties;
 
     private final ServiceState               serviceState;
@@ -192,6 +198,8 @@ public class AdminResource {
     private final boolean                    isUiTasksTabEnabled;
     private final AtlasAuditReductionService auditReductionService;
     private       Response                   version;
+    private final PurgeService               purgeService;
+    private final ReentrantLock              cronPurgeOperationLock;
 
     @Context
     private HttpServletRequest httpServletRequest;
@@ -205,7 +213,8 @@ public class AdminResource {
             MigrationProgressService migrationProgressService, 
AtlasServerService serverService,
             ExportImportAuditService exportImportAuditService, 
AtlasEntityStore entityStore,
             AtlasPatchManager patchManager, AtlasAuditService auditService, 
EntityAuditRepository auditRepository,
-            TaskManagement taskManagement, AtlasDebugMetricsSink 
debugMetricsRESTSink, AtlasAuditReductionService atlasAuditReductionService, 
AtlasMetricsUtil atlasMetricsUtil) {
+            TaskManagement taskManagement, AtlasDebugMetricsSink 
debugMetricsRESTSink, AtlasAuditReductionService atlasAuditReductionService, 
AtlasMetricsUtil atlasMetricsUtil,
+                         PurgeService purgeService) {
         this.serviceState              = serviceState;
         this.metricsService            = metricsService;
         this.exportService             = exportService;
@@ -224,6 +233,8 @@ public class AdminResource {
         this.debugMetricsRESTSink      = debugMetricsRESTSink;
         this.auditReductionService     = atlasAuditReductionService;
         this.atlasMetricsUtil          = atlasMetricsUtil;
+        this.purgeService              = purgeService;
+        this.cronPurgeOperationLock    = new ReentrantLock();
 
         if (atlasProperties != null) {
             this.defaultUIVersion            = 
atlasProperties.getString(DEFAULT_UI_VERSION, UI_VERSION_V3);
@@ -804,6 +815,63 @@ public class AdminResource {
         }
     }
 
+    @Scheduled(cron = "#{getPurgeCronExpression}")
+    public void schedulePurgeEntities() throws AtlasBaseException {
+        try {
+            Thread.currentThread().setName(PURGE_THREAD_NAME);
+            if (acquireCronPurgeOperationLock()) {
+                String state = serviceState.getState().toString();
+                LOG.info("==> Status of current node is {}", state);
+                if (state.equals(ACTIVE)) {
+                    LOG.info("==> Scheduled Purging has started");
+                    EntityMutationResponse entityMutationResponse = 
purgeService.purgeEntities();
+                    Set<String> guids = new HashSet<>();
+
+                    final List<AtlasEntityHeader> purgedEntities = 
entityMutationResponse.getPurgedEntities() != null
+                            ? entityMutationResponse.getPurgedEntities()
+                            : Collections.emptyList();
+
+                    if (CollectionUtils.isEmpty(purgedEntities)) {
+                        LOG.info("==> no entities got purged");
+                        return;
+                    }
+
+                    for (AtlasEntityHeader entityHeader : 
entityMutationResponse.getPurgedEntities()) {
+                        guids.add(entityHeader.getGuid());
+                    }
+
+                    LOG.info("==> Purged Entities {}", purgedEntities.size());
+
+                    auditService.add(AuditOperation.AUTO_PURGE, 
guids.toString(), entityMutationResponse.getPurgedEntitiesIds(),
+                            entityMutationResponse.getPurgedEntities().size());
+
+                    LOG.info("==> Scheduled Purging has finished");
+                } else {
+                    LOG.info("==> Current node is not active, so skipping the 
scheduled purge");
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Error while purging entities", e);
+            throw new AtlasBaseException(e);
+        } finally {
+            RequestContext.clear();
+            LOG.info("==> clearing the context");
+            cronPurgeOperationLock.unlock();
+        }
+    }
+
+    @Bean
+    private String getPurgeCronExpression() {
+        if (atlasProperties != null) {
+            return atlasProperties.getString(PURGE_CRON_EXPRESSION, 
PURGE_CRON_EXPRESSION_DEFAULT);
+        }
+        return PURGE_CRON_EXPRESSION_DEFAULT;
+    }
+
+    private boolean acquireCronPurgeOperationLock() {
+        return cronPurgeOperationLock.tryLock();
+    }
+
     @POST
     @Path("/importfile")
     @Produces(Servlets.JSON_MEDIA_TYPE)
diff --git 
a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java 
b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
index f7e2c7ef7..aac34993e 100644
--- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
@@ -48,7 +48,7 @@ public class AdminResourceTest {
     public void testStatusOfActiveServerIsReturned() throws IOException {
         
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
 
-        AdminResource adminResource = new AdminResource(serviceState, null, 
null, null, null, null, null, null, null, null, null, null, null, null, null, 
null, null);
+        AdminResource adminResource = new AdminResource(serviceState, null, 
null, null, null, null, null, null, null, null, null, null, null, null, null, 
null, null, null);
         Response      response      = adminResource.getStatus();
 
         assertEquals(response.getStatus(), HttpServletResponse.SC_OK);
@@ -62,7 +62,7 @@ public class AdminResourceTest {
     public void testResourceGetsValueFromServiceState() throws IOException {
         
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
 
-        AdminResource adminResource = new AdminResource(serviceState, null, 
null, null, null, null, null, null, null, null, null, null, null, null, null, 
null, null);
+        AdminResource adminResource = new AdminResource(serviceState, null, 
null, null, null, null, null, null, null, null, null, null, null, null, null, 
null, null, null);
         Response      response      = adminResource.getStatus();
 
         verify(serviceState).getState();

Reply via email to