This is an automated email from the ASF dual-hosted git repository.
chaitalithombare pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new ca4e036de ATLAS-4920: Atlas Auto Purge Feature (#406)
ca4e036de is described below
commit ca4e036de90dd02a0c62b89425f2e71f2f79e615
Author: achandel-01 <[email protected]>
AuthorDate: Mon Sep 22 16:38:07 2025 +0530
ATLAS-4920: Atlas Auto Purge Feature (#406)
resolved comments
comments resolved
resolved comments
updated UT's for PurgeService
fixed ut's names
fixed ut's names again
updated uts
updated uts,ui,function names
check style changes
Co-authored-by: Abhinav Chandel <[email protected]>
---
...-base_model_add_atlas_operation_attributes.json | 19 +
dashboardv2/public/js/utils/Enums.js | 3 +-
.../js/views/audit/AdminAuditTableLayoutView.js | 10 +-
dashboardv3/public/js/utils/Enums.js | 3 +-
.../js/views/audit/AdminAuditTableLayoutView.js | 10 +-
.../repository/graphdb/janus/AtlasJanusGraph.java | 1 -
.../apache/atlas/model/audit/AtlasAuditEntry.java | 1 +
.../repository/audit/EntityAuditListenerV2.java | 2 +-
.../repository/store/graph/AtlasEntityStore.java | 8 +
.../repository/store/graph/v1/DeleteHandlerV1.java | 159 +++++-
.../store/graph/v2/AtlasEntityStoreV2.java | 47 ++
.../org/apache/atlas/services/PurgeService.java | 537 +++++++++++++++++++++
.../apache/atlas/services/PurgeServiceTest.java | 220 +++++++++
.../apache/atlas/web/resources/AdminResource.java | 70 ++-
.../atlas/web/resources/AdminResourceTest.java | 4 +-
15 files changed, 1080 insertions(+), 14 deletions(-)
diff --git
a/addons/models/0000-Area0/patches/007-base_model_add_atlas_operation_attributes.json
b/addons/models/0000-Area0/patches/007-base_model_add_atlas_operation_attributes.json
new file mode 100644
index 000000000..5bfe974f5
--- /dev/null
+++
b/addons/models/0000-Area0/patches/007-base_model_add_atlas_operation_attributes.json
@@ -0,0 +1,19 @@
+{
+ "patches": [
+ {
+ "id": "TYPEDEF_PATCH_0007_001",
+ "description": "Add additional operations in Atlas",
+ "action": "UPDATE_ENUMDEF",
+ "typeName": "atlas_operation",
+ "applyToVersion": "1.1",
+ "updateToVersion": "1.2",
+ "params": null,
+ "elementDefs": [
+ {
+ "ordinal": 10,
+ "value": "AUTO_PURGE"
+ }
+ ]
+ }
+ ]
+}
diff --git a/dashboardv2/public/js/utils/Enums.js
b/dashboardv2/public/js/utils/Enums.js
index 00bbe103d..fd3760ead 100644
--- a/dashboardv2/public/js/utils/Enums.js
+++ b/dashboardv2/public/js/utils/Enums.js
@@ -69,7 +69,8 @@ define(["require", "backbone"], function(require) {
BUSINESS_METADATA: "Business Metadata",
PURGE: "Purge Entities",
IMPORT: "Import Entities",
- EXPORT: "Export Entities"
+ EXPORT: "Export Entities",
+ AUTO_PURGE : "Auto Purged Entities"
}
Enums.entityStateReadOnly = {
diff --git a/dashboardv2/public/js/views/audit/AdminAuditTableLayoutView.js
b/dashboardv2/public/js/views/audit/AdminAuditTableLayoutView.js
index f7918e82f..cc56e03cf 100644
--- a/dashboardv2/public/js/views/audit/AdminAuditTableLayoutView.js
+++ b/dashboardv2/public/js/views/audit/AdminAuditTableLayoutView.js
@@ -244,7 +244,7 @@ define(['require',
el.attr('colspan', '8');
if (results) {
var adminValues = null;
- if (operation == "PURGE") {
+ if (operation == "PURGE" || operation ==
"AUTO_PURGE") {
adminText =
that.displayPurgeAndImportAudits(auditData);
} else if (operation == "EXPORT" || operation
== "IMPORT") {
adminText =
that.displayExportAudits(auditData);
@@ -353,7 +353,7 @@ define(['require',
var adminValues = '<ul class="col-sm-6">',
guids = null,
adminTypDetails = Enums.category[obj.operation];
- if (obj.operation == "PURGE") {
+ if (obj.operation == "PURGE" || obj.operation == "AUTO_PURGE")
{
guids = obj.results ? obj.results.replace('[',
'').replace(']', '').split(',') : guids;
} else {
guids = obj.model.get('params') ?
obj.model.get('params').split(',') : guids;
@@ -428,9 +428,13 @@ define(['require',
onClickAdminPurgedEntity: function(e) {
var that = this;
require(['views/audit/AuditTableLayoutView'],
function(AuditTableLayoutView) {
+ const titles = {
+ PURGE: "Purged Entity Details",
+ AUTO_PURGE: "Auto Purge Entity Details"
+ };
var obj = {
guid: $(e.target).text(),
- titleText: (e.target.dataset.operation == "PURGE")
? "Purged Entity Details: " : "Import Details: "
+ titleText: (titles[e.target.dataset.operation] ||
"Import Details") + ": "
},
modalData = {
title: obj.titleText + obj.guid,
diff --git a/dashboardv3/public/js/utils/Enums.js
b/dashboardv3/public/js/utils/Enums.js
index 00bbe103d..fd3760ead 100644
--- a/dashboardv3/public/js/utils/Enums.js
+++ b/dashboardv3/public/js/utils/Enums.js
@@ -69,7 +69,8 @@ define(["require", "backbone"], function(require) {
BUSINESS_METADATA: "Business Metadata",
PURGE: "Purge Entities",
IMPORT: "Import Entities",
- EXPORT: "Export Entities"
+ EXPORT: "Export Entities",
+ AUTO_PURGE : "Auto Purged Entities"
}
Enums.entityStateReadOnly = {
diff --git a/dashboardv3/public/js/views/audit/AdminAuditTableLayoutView.js
b/dashboardv3/public/js/views/audit/AdminAuditTableLayoutView.js
index f7918e82f..cc56e03cf 100644
--- a/dashboardv3/public/js/views/audit/AdminAuditTableLayoutView.js
+++ b/dashboardv3/public/js/views/audit/AdminAuditTableLayoutView.js
@@ -244,7 +244,7 @@ define(['require',
el.attr('colspan', '8');
if (results) {
var adminValues = null;
- if (operation == "PURGE") {
+ if (operation == "PURGE" || operation ==
"AUTO_PURGE") {
adminText =
that.displayPurgeAndImportAudits(auditData);
} else if (operation == "EXPORT" || operation
== "IMPORT") {
adminText =
that.displayExportAudits(auditData);
@@ -353,7 +353,7 @@ define(['require',
var adminValues = '<ul class="col-sm-6">',
guids = null,
adminTypDetails = Enums.category[obj.operation];
- if (obj.operation == "PURGE") {
+ if (obj.operation == "PURGE" || obj.operation == "AUTO_PURGE")
{
guids = obj.results ? obj.results.replace('[',
'').replace(']', '').split(',') : guids;
} else {
guids = obj.model.get('params') ?
obj.model.get('params').split(',') : guids;
@@ -428,9 +428,13 @@ define(['require',
onClickAdminPurgedEntity: function(e) {
var that = this;
require(['views/audit/AuditTableLayoutView'],
function(AuditTableLayoutView) {
+ const titles = {
+ PURGE: "Purged Entity Details",
+ AUTO_PURGE: "Auto Purge Entity Details"
+ };
var obj = {
guid: $(e.target).text(),
- titleText: (e.target.dataset.operation == "PURGE")
? "Purged Entity Details: " : "Import Details: "
+ titleText: (titles[e.target.dataset.operation] ||
"Import Details") + ": "
},
modalData = {
title: obj.titleText + obj.guid,
diff --git
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
index e5016eb70..e0d4857a1 100644
---
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
+++
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
@@ -446,7 +446,6 @@ public class AtlasJanusGraph implements
AtlasGraph<AtlasJanusVertex, AtlasJanusE
return uniqueKeyHandler;
}
-
public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>>
wrapVertices(Iterable<? extends Vertex> it) {
return Iterables.transform(it, (Function<Vertex,
AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>>) input ->
GraphDbObjectFactory.createVertex(AtlasJanusGraph.this, input));
}
diff --git
a/intg/src/main/java/org/apache/atlas/model/audit/AtlasAuditEntry.java
b/intg/src/main/java/org/apache/atlas/model/audit/AtlasAuditEntry.java
index a103a1730..722c1a52e 100644
--- a/intg/src/main/java/org/apache/atlas/model/audit/AtlasAuditEntry.java
+++ b/intg/src/main/java/org/apache/atlas/model/audit/AtlasAuditEntry.java
@@ -136,6 +136,7 @@ public class AtlasAuditEntry extends AtlasBaseModelObject
implements Serializabl
public enum AuditOperation {
PURGE("PURGE"),
+ AUTO_PURGE("AUTO_PURGE"),
EXPORT("EXPORT"),
IMPORT("IMPORT"),
IMPORT_DELETE_REPL("IMPORT_DELETE_REPL"),
diff --git
a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
index f0ac8a9ec..82a02200d 100644
---
a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
+++
b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
@@ -407,7 +407,7 @@ public class EntityAuditListenerV2 implements
EntityChangeListenerV2 {
private EntityAuditEventV2 createEvent(EntityAuditEventV2
entityAuditEventV2, AtlasEntity entity, EntityAuditActionV2 action, String
details) {
entityAuditEventV2.setEntityId(entity.getGuid());
entityAuditEventV2.setTimestamp(System.currentTimeMillis());
- entityAuditEventV2.setUser(RequestContext.get().getUser());
+ entityAuditEventV2.setUser(RequestContext.get().getUser() != null ?
RequestContext.get().getUser() : "admin");
entityAuditEventV2.setAction(action);
entityAuditEventV2.setDetails(details);
entityAuditEventV2.setEntity(entity);
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
index 457f86575..6f13175ce 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
@@ -28,6 +28,7 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasEntityHeaders;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.EntityStream;
import org.apache.atlas.type.AtlasEntityType;
@@ -226,6 +227,13 @@ public interface AtlasEntityStore {
*/
EntityMutationResponse purgeByIds(Set<String> guids) throws
AtlasBaseException;
+ /*
+ * Returns set of auto-purged entity guids
+ */
+ EntityMutationResponse purgeEntitiesInBatch(Set<String> deletedVertices)
throws AtlasBaseException;
+
+ Set<AtlasVertex> accumulateDeletionCandidates(Set<String> vertices) throws
AtlasBaseException;
+
/**
* Add classification(s)
*/
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
index 24c80c0a1..5d83f2934 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
@@ -63,6 +63,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -77,18 +78,22 @@ import static org.apache.atlas.model.TypeCategory.STRUCT;
import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.model.instance.AtlasEntity.Status.PURGED;
+import static
org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_DATASET;
+import static
org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_PROCESS;
import static
org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO;
import static
org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY;
import static
org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_STATUS;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static
org.apache.atlas.repository.Constants.CLASSIFICATION_NAME_DELIMITER;
import static
org.apache.atlas.repository.Constants.EDGE_PENDING_TASKS_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY;
import static
org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY;
import static
org.apache.atlas.repository.Constants.PROPAGATED_CLASSIFICATION_NAMES_KEY;
import static
org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
import static
org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY;
import static
org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.TYPE_NAME_PROPERTY_KEY;
import static
org.apache.atlas.repository.graph.GraphHelper.getAllClassificationEdges;
import static
org.apache.atlas.repository.graph.GraphHelper.getAssociatedEntityVertex;
import static
org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds;
@@ -127,6 +132,8 @@ public abstract class DeleteHandlerV1 {
public static final Logger LOG =
LoggerFactory.getLogger(DeleteHandlerV1.class);
private static final boolean DEFERRED_ACTION_ENABLED =
AtlasConfiguration.TASKS_USE_ENABLED.getBoolean();
+ private static final String PROCESS_OUTPUTS_EDGE =
"__Process.outputs";
+ private static final String RELATIONSHIP_ATTRIBUTE_KEY_STRING =
"columnLineages";
protected final GraphHelper graphHelper;
@@ -156,12 +163,22 @@ public abstract class DeleteHandlerV1 {
* @throws AtlasBaseException
*/
public void deleteEntities(Collection<AtlasVertex> instanceVertices)
throws AtlasBaseException {
+ Set<AtlasVertex> deletionCandidateVertices =
accumulateDeletionCandidates(instanceVertices);
+ deleteTraitsAndVertices(deletionCandidateVertices);
+ }
+
+ /*
+ accumulate the deletion candidates
+ */
+ public Set<AtlasVertex>
accumulateDeletionCandidates(Collection<AtlasVertex> instanceVertices) throws
AtlasBaseException {
final RequestContext requestContext =
RequestContext.get();
final Set<AtlasVertex> deletionCandidateVertices = new HashSet<>();
final boolean isPurgeRequested =
requestContext.isPurgeRequested();
+ final Set<String> instanceVertexGuids = new HashSet<>();
for (AtlasVertex instanceVertex : instanceVertices) {
final String guid =
AtlasGraphUtilsV2.getIdFromVertex(instanceVertex);
+ instanceVertexGuids.add(guid);
if (skipVertexForDelete(instanceVertex)) {
if (LOG.isDebugEnabled()) {
@@ -186,15 +203,155 @@ public abstract class DeleteHandlerV1 {
requestContext.recordEntityDelete(entityHeader);
deletionCandidateVertices.add(vertexInfo.getVertex());
}
+
+ AtlasEntityHeader entity =
entityRetriever.toAtlasEntityHeader(instanceVertex);
+ AtlasEntityType entityType =
typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+ if (entityType.getEntityDef().hasSuperType(ATLAS_TYPE_DATASET)) {
+ addUpstreamProcessEntities(instanceVertex,
deletionCandidateVertices, instanceVertexGuids);
+ }
+
+ if (entityType.getEntityDef().hasSuperType(ATLAS_TYPE_PROCESS)) {
+ getColumnLineageEntities(instanceVertex,
deletionCandidateVertices);
+ }
}
+ return deletionCandidateVertices;
+ }
- // Delete traits and vertices.
+ /*
+ actually delete traits and then the vertex along its references
+ */
+ public void deleteTraitsAndVertices(Collection<AtlasVertex>
deletionCandidateVertices) throws AtlasBaseException {
for (AtlasVertex deletionCandidateVertex : deletionCandidateVertices) {
deleteAllClassifications(deletionCandidateVertex);
deleteTypeVertex(deletionCandidateVertex,
isInternalType(deletionCandidateVertex));
}
}
+ public void addUpstreamProcessEntities(AtlasVertex entityVertex,
Set<AtlasVertex> deletionCandidateVertices, Set<String> instanceVertexGuids)
throws AtlasBaseException {
+ RequestContext requestContext = RequestContext.get();
+
+ Iterator<AtlasEdge> edgeIterator =
GraphHelper.getIncomingEdgesByLabel(entityVertex, PROCESS_OUTPUTS_EDGE);
+
+ String entityVertexGuid = entityVertex.getProperty(GUID_PROPERTY_KEY,
String.class);
+
+ while (edgeIterator.hasNext()) {
+ AtlasEdge edge = edgeIterator.next();
+ AtlasVertex processVertex = edge.getOutVertex();
+
+ String guid = processVertex.getProperty(GUID_PROPERTY_KEY,
String.class);
+ if (instanceVertexGuids.contains(guid)) {
+ return; // already added
+ }
+
+ boolean isEligible = isEligible(processVertex, entityVertexGuid,
instanceVertexGuids);
+
+ if (isEligible) {
+ instanceVertexGuids.add(guid);
+
+ getColumnLineageEntities(processVertex,
deletionCandidateVertices);
+
+ for (GraphHelper.VertexInfo vertexInfo :
getOwnedVertices(processVertex)) {
+ AtlasEntityHeader entityHeader = vertexInfo.getEntity();
+
+ if (requestContext.isPurgeRequested()) {
+
entityHeader.setClassifications(entityRetriever.getAllClassifications(vertexInfo.getVertex()));
+ }
+
+ requestContext.recordEntityDelete(entityHeader);
+ deletionCandidateVertices.add(vertexInfo.getVertex());
+ }
+ }
+ }
+ }
+
+ public void getColumnLineageEntities(AtlasVertex process, Set<AtlasVertex>
deletionCandidateVertices) throws AtlasBaseException {
+ RequestContext requestContext = RequestContext.get();
+
+ AtlasEntityHeader entity =
entityRetriever.toAtlasEntityHeader(process);
+ AtlasEntityType entityType =
typeRegistry.getEntityTypeByName(entity.getTypeName());
+ Map<String, Map<String, AtlasAttribute>> relationshipAttributes =
entityType.getRelationshipAttributes();
+ Map<String, AtlasAttribute> columnLineages =
relationshipAttributes.get(RELATIONSHIP_ATTRIBUTE_KEY_STRING);
+
+ if (columnLineages != null && !columnLineages.isEmpty()) {
+ AtlasAttribute atlasAttribute =
columnLineages.values().iterator().next();
+ String relationshipEdgeLabel =
atlasAttribute.getRelationshipEdgeLabel();
+
+ Iterator<AtlasEdge> edgeIterator =
GraphHelper.getIncomingEdgesByLabel(process, relationshipEdgeLabel);
+
+ int addedCount = 0;
+
+ while (edgeIterator.hasNext()) {
+ AtlasVertex columnLineageVertex =
edgeIterator.next().getOutVertex();
+ String typeName =
columnLineageVertex.getProperty(TYPE_NAME_PROPERTY_KEY, String.class);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Column-lineage candidate: type={}, guid={},
state={}, viaEdgeLabel={}, process={}",
+ typeName, getGuid(columnLineageVertex),
getState(columnLineageVertex), relationshipEdgeLabel, string(process));
+ }
+
+ AtlasEntityHeader columnLineageEntityHeader =
entityRetriever.toAtlasEntityHeader(columnLineageVertex);
+ requestContext.recordEntityDelete(columnLineageEntityHeader);
+ deletionCandidateVertices.add(columnLineageVertex);
+ addedCount++;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Collected {} column-lineage vertices via '{}' for
process {}", addedCount, relationshipEdgeLabel, getGuid(process));
+ }
+ }
+ }
+
+ boolean isEligible(AtlasVertex processVertex, String entityVertexGuid,
Set<String> instanceVertexGuids) throws AtlasBaseException {
+ RequestContext requestContext = RequestContext.get();
+
+ if (requestContext.isPurgeRequested() && getState(processVertex) ==
ACTIVE) {
+ // skipping the active process entity purging back ward
compatibility
+ return false;
+ }
+
+ Iterator<AtlasEdge> processEdges =
GraphHelper.getOutGoingEdgesByLabel(processVertex, PROCESS_OUTPUTS_EDGE);
+
+ long countDeletedOutputs = 0;
+ long instanceDeleted = 0; // for handling the case of deletion
+
+ while (processEdges.hasNext()) {
+ //process vertex can have the deleted outgoing edges apart from
the current data_set entity
+ AtlasVertex outputEntity = processEdges.next().getInVertex();
+ String outputEntityGuid =
outputEntity.getProperty(GUID_PROPERTY_KEY, String.class); // output guid
+
+ if (getState(outputEntity) == ACTIVE &&
!entityVertexGuid.equals(outputEntityGuid)) {
+ return false;
+ }
+ countDeletedOutputs++;
+
+ if (requestContext.isPurgeRequested() &&
instanceVertexGuids.contains(outputEntityGuid)) {
+ instanceDeleted++; //for checking that if all outputs are in
the instanceVertexGuids during purge
+ }
+ }
+
+ if (requestContext.isPurgeRequested() && countDeletedOutputs > 1) { //
ensuring process purging along only left deleted output
+ // but can skip process entity if the all datasets are to be
purged in a single scheduled job
+ return countDeletedOutputs == instanceDeleted;
+ }
+
+ return true;
+ }
+
+ public static boolean isSoftDeletableProcess(AtlasVertex processVertex) {
+ Iterator<AtlasEdge> processEdges =
GraphHelper.getOutGoingEdgesByLabel(processVertex, PROCESS_OUTPUTS_EDGE);
+
+ while (processEdges.hasNext()) {
+ AtlasVertex output = processEdges.next().getInVertex();
+
+ if (getState(output) == ACTIVE) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
/**
* Delete the specified relationship edge.
*
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index cdfa8b542..3dd45f98f 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -577,6 +577,53 @@ public class AtlasEntityStoreV2 implements
AtlasEntityStore {
return ret;
}
+ @Override
+ @GraphTransaction
+ public EntityMutationResponse purgeEntitiesInBatch(Set<String>
purgeCandidates) throws AtlasBaseException {
+ LOG.info("==> purgeEntitiesInBatch()");
+
+ Collection<AtlasVertex> purgeVertices = new ArrayList<>();
+ EntityMutationResponse response = new EntityMutationResponse();
+
+ RequestContext requestContext = RequestContext.get();
+ requestContext.setDeleteType(DeleteType.HARD); // hard deleter
+ requestContext.setPurgeRequested(true);
+
+ for (String guid : purgeCandidates) {
+ AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(graph, guid);
+ if (vertex != null) {
+ AtlasEntityHeader entityHeader =
entityRetriever.toAtlasEntityHeader(vertex);
+ purgeVertices.add(vertex);
+ response.addEntity(PURGE, entityHeader);
+ }
+ }
+
+ deleteDelegate.getHandler().deleteTraitsAndVertices(purgeVertices);
+
+ entityChangeNotifier.onEntitiesMutated(response, false);
+
+ for (AtlasEntityHeader entity : response.getPurgedEntities()) {
+ LOG.info("Auto purged entity with guid {}", entity.getGuid());
+ }
+
+ LOG.info("<== purgeEntitiesInBatch()");
+
+ return response;
+ }
+
+ @Override
+ public Set<AtlasVertex> accumulateDeletionCandidates(Set<String> guids)
throws AtlasBaseException {
+ LOG.info("==> accumulateDeletionCandidates() !");
+ Set<AtlasVertex> vertices = new HashSet<>();
+
+ for (String guid : guids) {
+ AtlasVertex vertex = entityRetriever.getEntityVertex(guid);
+ vertices.add(vertex);
+ }
+
+ return
deleteDelegate.getHandler().accumulateDeletionCandidates(vertices);
+ }
+
@Override
@GraphTransaction
public void addClassifications(final String guid, final
List<AtlasClassification> classifications) throws AtlasBaseException {
diff --git
a/repository/src/main/java/org/apache/atlas/services/PurgeService.java
b/repository/src/main/java/org/apache/atlas/services/PurgeService.java
new file mode 100644
index 000000000..8f5de1f72
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/services/PurgeService.java
@@ -0,0 +1,537 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.services;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.DeleteType;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.annotation.AtlasService;
+import org.apache.atlas.annotation.Timed;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.pc.WorkItemBuilder;
+import org.apache.atlas.pc.WorkItemConsumer;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.service.Service;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.utils.AtlasPerfTracer;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
+
+import static
org.apache.atlas.model.instance.EntityMutations.EntityOperation.PURGE;
+import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY;
+import static
org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.VERTEX_INDEX;
+
+@AtlasService
+@Order(9)
+@Component
+public class PurgeService implements Service {
+ private static final Logger LOG =
LoggerFactory.getLogger(PurgeService.class);
+ private static final Logger PERF_LOG =
AtlasPerfTracer.getPerfLogger("service.Purge");
+ private final AtlasGraph atlasGraph;
+ private static Configuration atlasProperties;
+ private final AtlasEntityStore entityStore;
+ private final AtlasTypeRegistry typeRegistry;
+
+ private static final String ENABLE_PROCESS_SOFT_DELETION =
"atlas.enable.process.soft.delete";
+ private static final boolean ENABLE_PROCESS_SOFT_DELETION_DEFAULT = false;
+ private static final String PURGE_ENABLED_SERVICE_TYPES =
"atlas.purge.enabled.services";
+ private static final String SOFT_DELETE_ENABLED_PROCESS_TYPES =
"atlas.soft.delete.enabled.process.types";
+ private static final String PURGE_BATCH_SIZE =
"atlas.purge.batch.size";
+ private static final int DEFAULT_PURGE_BATCH_SIZE = 1000;
// fetching limit at a time
+ private static final String PURGE_WORKER_BATCH_SIZE =
"atlas.purge.worker.batch.size";
+ private static final int DEFAULT_PURGE_WORKER_BATCH_SIZE = 100;
+ private static final String CLEANUP_WORKER_BATCH_SIZE =
"atlas.cleanup.worker.batch.size";
+ private static final int DEFAULT_CLEANUP_WORKER_BATCH_SIZE = 100;
+ private static final String PURGE_RETENTION_PERIOD =
"atlas.purge.deleted.entity.retention.days";
+ private static final int PURGE_RETENTION_PERIOD_DEFAULT = 30; //
days
+ private static final String PURGE_WORKERS_COUNT =
"atlas.purge.workers.count";
+ private static final int DEFAULT_PURGE_WORKERS_COUNT = 2;
+ private static final String CLEANUP_WORKERS_COUNT =
"atlas.cleanup.workers.count";
+ private static final int DEFAULT_CLEANUP_WORKERS_COUNT = 2;
+ private static final String PROCESS_ENTITY_CLEANER_THREAD_NAME =
"Process-Entity-Cleaner";
+ private final String indexSearchPrefix =
AtlasGraphUtilsV2.getIndexSearchPrefix();
+ private static final int DEFAULT_CLEANUP_BATCH_SIZE = 1000;
+ private static final String CLEANUP_WORKERS_NAME =
"Process-Cleanup-Worker";
+ private static final String PURGE_WORKERS_NAME =
"Entity-Purge-Worker";
+ private static final String DELETED =
"DELETED";
+ private static final String ACTIVE =
"ACTIVE";
+ private static final String AND_STR = " AND
";
+
+ static {
+ try {
+ atlasProperties = ApplicationProperties.get();
+ } catch (Exception e) {
+ LOG.info("Failed to load application properties", e);
+ }
+ }
+
+ @Inject
+ public PurgeService(AtlasGraph atlasgraph, AtlasEntityStore entityStore,
AtlasTypeRegistry typeRegistry) {
+ this.atlasGraph = atlasgraph;
+ this.entityStore = entityStore;
+ this.typeRegistry = typeRegistry;
+ }
+
+ @Override
+ public void start() throws AtlasException {
+ if (!getSoftDeletionFlag()) {
+ LOG.info("==> cleanup not enabled");
+ return;
+ }
+
+ LOG.info("==> PurgeService.start()");
+
+ launchCleanUp();
+
+ LOG.info("<== Launched the clean up thread");
+ }
+
+ @Override
+ public void stop() throws AtlasException {
+ LOG.info("==> stopping the purge service");
+ }
+
+ public void launchCleanUp() {
+ LOG.info("==> launching the new thread");
+
+ Thread thread = new Thread(
+ () -> {
+ long startTime = System.currentTimeMillis();
+ LOG.info("==> {} started",
PROCESS_ENTITY_CLEANER_THREAD_NAME);
+ softDeleteProcessEntities();
+ LOG.info("==> exiting thread {}",
PROCESS_ENTITY_CLEANER_THREAD_NAME);
+ long endTime = System.currentTimeMillis();
+ LOG.info("==> completed cleanup {} seconds !", (endTime -
startTime) / 1000);
+ });
+
+ thread.setName(PROCESS_ENTITY_CLEANER_THREAD_NAME);
+ thread.start();
+ LOG.info("==> launched the thread for the clean up");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Timed
+ public EntityMutationResponse purgeEntities() {
+ LOG.info("==> PurgeService.purgeEntities()");
+ // index query of specific batch size
+ AtlasPerfTracer perf = null;
+ EntityMutationResponse entityMutationResponse = new
EntityMutationResponse();
+ RequestContext requestContext = RequestContext.get();
+ requestContext.setDeleteType(DeleteType.HARD); // hard delete
+ requestContext.setPurgeRequested(true);
+
+ try {
+ if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+ perf = AtlasPerfTracer.getPerfTracer(PERF_LOG,
"PurgeService.purgeEntities");
+ }
+
+ Set<String> allEligibleTypes = getEntityTypes();
+
+ try {
+ //bring n number of entities like 1000 at point of type
Processes
+ WorkItemsQualifier wiq = createQualifier(typeRegistry,
entityStore, atlasGraph, getPurgeWorkerBatchSize(), getPurgeWorkersCount(),
true);
+
+ String indexQuery = getBulkQueryString(allEligibleTypes,
getPurgeRetentionPeriod());
+ Iterator<Result> itr = atlasGraph.indexQuery(VERTEX_INDEX,
indexQuery).vertices(0, getPurgeBatchSize());
+ LOG.info("==> fetched Deleted entities");
+
+ if (!itr.hasNext()) {
+ LOG.info("==> no Purge Entities found");
+ return entityMutationResponse;
+ }
+
+ Set<String> producedDeletionCandidates = new HashSet<>(); //
look up
+
+ while (itr.hasNext()) {
+ AtlasVertex vertex = itr.next().getVertex();
+
+ if (vertex == null) {
+ continue;
+ }
+
+ String guid = vertex.getProperty(GUID_PROPERTY_KEY,
String.class);
+
+ if (!producedDeletionCandidates.contains(guid)) {
+ Set<String> instanceVertex = new HashSet<>();
+ instanceVertex.add(guid);
+
+ Set<AtlasVertex> deletionCandidates =
entityStore.accumulateDeletionCandidates(instanceVertex);
+
+ for (AtlasVertex deletionCandidate :
deletionCandidates) {
+ String deletionCandidateGuid =
deletionCandidate.getProperty(GUID_PROPERTY_KEY, String.class);
+ if
(!producedDeletionCandidates.contains(deletionCandidateGuid)) {
+
producedDeletionCandidates.add(deletionCandidateGuid);
+ wiq.checkProduce(deletionCandidate);
+ }
+ }
+ }
+ }
+
+ wiq.shutdown();
+
+ // collecting all the results
+ Queue results = wiq.getResults();
+
+ LOG.info("==> Purged {} !", results.size());
+
+ while (!results.isEmpty()) {
+ AtlasEntityHeader entityHeader = (AtlasEntityHeader)
results.poll();
+ if (entityHeader == null) {
+ continue;
+ }
+ entityMutationResponse.addEntity(PURGE, entityHeader);
+ }
+ } catch (Exception ex) {
+ LOG.error("purge: failed!", ex);
+ } finally {
+ LOG.info("purge: Done!");
+ }
+ } finally {
+ AtlasPerfTracer.log(perf);
+ }
+
+ LOG.info("<== PurgeService.purgeEntities()");
+
+ return entityMutationResponse;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Timed
+ public void softDeleteProcessEntities() {
+ LOG.info("==> softDeleteProcessEntities()");
+
+ AtlasPerfTracer perf = null;
+ try {
+ if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+ perf = AtlasPerfTracer.getPerfTracer(PERF_LOG,
"PurgeService.softDeleteProcessEntities");
+ }
+
+ Set<String> validProcessTypes = getProcessTypes();
+ try {
+ //bring n number of entities like 1000 at point of type
Processes
+ WorkItemsQualifier wiq = createQualifier(typeRegistry,
entityStore, atlasGraph, getCleanupWorkerBatchSize(), getCleanUpWorkersCount(),
false);
+ int offset = 0;
+ boolean moreResults = true;
+
+ while (moreResults) {
+ String indexQuery = getBulkQueryString(validProcessTypes,
0);
+ Iterator<Result> itr = atlasGraph.indexQuery(VERTEX_INDEX,
indexQuery).vertices(offset, DEFAULT_CLEANUP_BATCH_SIZE);
+ LOG.info("==> fetched entities");
+
+ if (!itr.hasNext()) {
+ moreResults = false;
+ }
+
+ while (itr.hasNext()) {
+ AtlasVertex vertex = itr.next().getVertex();
+ if (vertex != null) {
+ wiq.checkProduce(vertex);
+ }
+ }
+
+ offset += DEFAULT_CLEANUP_BATCH_SIZE;
+ LOG.info("==> offset {}", offset);
+ }
+
+ wiq.shutdown();
+ } catch (Exception ex) {
+ LOG.error("cleanUp: failed!", ex);
+ } finally {
+ LOG.info("cleanUp: Done!");
+ }
+ } finally {
+ AtlasPerfTracer.log(perf);
+ }
+
+ LOG.info("<== softDeleteProcessEntities()");
+ }
+
+ static class EntityQualifier extends WorkItemConsumer<AtlasVertex> {
+ private final Set<String> batch = new HashSet<>();
+ private final AtlasEntityStore entityStore;
+ private final AtlasTypeRegistry typeRegistry;
+ private final AtlasGraph atlasGraph;
+ private final boolean isPurgeEnabled;
+ private int batchesProcessed;
+ private int batchSize;
+
+ public EntityQualifier(BlockingQueue<AtlasVertex> queue,
AtlasTypeRegistry typeRegistry, AtlasEntityStore entityStore, AtlasGraph
atlasGraph, boolean isPurgeEnabled, int batchSize) {
+ super(queue);
+ this.typeRegistry = typeRegistry;
+ this.entityStore = entityStore;
+ this.atlasGraph = atlasGraph;
+ this.isPurgeEnabled = isPurgeEnabled;
+ this.batchesProcessed = 0;
+ this.batchSize = batchSize;
+
+ if (isPurgeEnabled) {
+ LOG.info("==> consumers are purge enabled , batch size is {}",
batchSize);
+ } else {
+ LOG.info("==> consumers are soft delete enabled , batch size
is {}", batchSize);
+ }
+ }
+
+ @Override
+ protected void processItem(AtlasVertex vertex) {
+ String guid = vertex.getProperty(GUID_PROPERTY_KEY, String.class);
+ LOG.info("==> processing the entity {}", guid);
+
+ try {
+ if (!isPurgeEnabled && !isEligible(vertex)) {
+ return;
+ }
+ batch.add(guid);
+ commit();
+ } catch (Exception ex) {
+ LOG.info("{}", ex.getMessage());
+ }
+ }
+
+ @Override
+ protected void doCommit() {
+ if (batch.size() == batchSize) {
+ attemptCommit();
+ }
+ }
+
+ @Override
+ protected void commitDirty() {
+ if (!batch.isEmpty()) {
+ attemptCommit();
+ }
+
+ super.commitDirty();
+ }
+
+ protected void attemptCommit() {
+ EntityMutationResponse res;
+ List<AtlasEntityHeader> results = Collections.emptyList();
+
+ try {
+ if (isPurgeEnabled) {
+ // purging not by directly
+ res = entityStore.purgeEntitiesInBatch(batch);
+ } else {
+ List<String> batchList = new ArrayList<>(batch);
+ res = entityStore.deleteByIds(batchList);
+ }
+
+ results = isPurgeEnabled ? res.getPurgedEntities() :
res.getDeletedEntities();
+
+ if (CollectionUtils.isEmpty(results)) {
+ return;
+ }
+
+ for (AtlasEntityHeader entityHeader : results) {
+ addResult(entityHeader); // adding results
+ }
+ } catch (Exception e) {
+ LOG.info("==> Exception: {}", e.getMessage());
+ } finally {
+ batchesProcessed++;
+ batch.clear();
+ LOG.info("==> Processed {} batch number with total {} entities
purged!", batchesProcessed, results.size());
+ }
+ }
+ }
+
+ static class EntityQualifierBuilder implements
WorkItemBuilder<EntityQualifier, AtlasVertex> {
+ private final AtlasTypeRegistry typeRegistry;
+ private final AtlasEntityStore entityStore;
+ private final AtlasGraph atlasGraph;
+ private final boolean isPurgeEnabled;
+ private int batchSize;
+
+ public EntityQualifierBuilder(AtlasTypeRegistry typeRegistry,
AtlasEntityStore entityStore, AtlasGraph atlasGraph, boolean isPurgeEnabled,
int batchSize) {
+ this.typeRegistry = typeRegistry;
+ this.entityStore = entityStore;
+ this.atlasGraph = atlasGraph;
+ this.isPurgeEnabled = isPurgeEnabled;
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public EntityQualifier build(BlockingQueue<AtlasVertex> queue) {
+ return new EntityQualifier(queue, typeRegistry, entityStore,
atlasGraph, isPurgeEnabled, batchSize);
+ }
+ }
+
+ static class WorkItemsQualifier extends WorkItemManager<AtlasVertex,
EntityQualifier> {
+ public WorkItemsQualifier(WorkItemBuilder builder, int batchSize, int
numWorkers, boolean isPurgeEnabled) {
+ super(builder, isPurgeEnabled ? PURGE_WORKERS_NAME :
CLEANUP_WORKERS_NAME, batchSize, numWorkers, true);
+ }
+
+ @Override
+ public void shutdown() throws InterruptedException {
+ LOG.info("==> Shutting down manager!");
+ super.shutdown();
+ }
+ }
+
+ public WorkItemsQualifier createQualifier(AtlasTypeRegistry typeRegistry,
AtlasEntityStore entityStore, AtlasGraph atlasGraph, int batchSize, int
numWorkers, boolean isPurgeEnabled) {
+ EntityQualifierBuilder eqb = new EntityQualifierBuilder(typeRegistry,
entityStore, atlasGraph, isPurgeEnabled, batchSize);
+ LOG.info("==> creating the purge entity producer");
+ return new WorkItemsQualifier(eqb, batchSize, numWorkers,
isPurgeEnabled);
+ }
+
+ public static boolean isEligible(AtlasVertex vertex) {
+ return DeleteHandlerV1.isSoftDeletableProcess(vertex);
+ }
+
+ private String getBulkQueryString(Set<String> typeNames, int
retentionPeriod) {
+ String joinedTypes = typeNames.stream()
+ .map(t -> "\"" + t + "\"")
+ .collect(Collectors.joining(" OR ", "(", ")"));
+
+ String indexQuery = getString(retentionPeriod, joinedTypes);
+
+ LOG.info("bulk index query : {}", indexQuery);
+ return indexQuery;
+ }
+
+ private long timeThresholdMillis(int retentionPeriod) {
+ long currentTimeMillis = System.currentTimeMillis();
+ long retentionPeriodMillis = retentionPeriod * 24L * 60 * 60 * 1000;
// Convert days to ms
+ return currentTimeMillis - retentionPeriodMillis;
+ }
+
+ private String getString(int retentionDays, String joinedTypes) {
+ String baseQuery = indexSearchPrefix + "\"" + ENTITY_TYPE_PROPERTY_KEY
+ "\": " + joinedTypes + AND_STR +
+ indexSearchPrefix + "\"" + STATE_PROPERTY_KEY + "\": (%s)";
+
+ String indexQuery = (retentionDays > 0)
+ ? String.format(baseQuery + AND_STR + indexSearchPrefix + "\""
+ MODIFICATION_TIMESTAMP_PROPERTY_KEY + "\": [* TO %s]", DELETED,
timeThresholdMillis(retentionDays))
+ : String.format(baseQuery, ACTIVE);
+
+ return indexQuery;
+ }
+
+ public boolean getSoftDeletionFlag() {
+ if (atlasProperties != null) {
+ return atlasProperties.getBoolean(ENABLE_PROCESS_SOFT_DELETION,
ENABLE_PROCESS_SOFT_DELETION_DEFAULT);
+ }
+ return false;
+ }
+
+ private int getPurgeRetentionPeriod() {
+ int retentionPeriod = PURGE_RETENTION_PERIOD_DEFAULT;
+
+ if (atlasProperties != null) {
+ retentionPeriod = atlasProperties.getInt(PURGE_RETENTION_PERIOD,
PURGE_RETENTION_PERIOD_DEFAULT);
+ }
+
+ return Math.max(PURGE_RETENTION_PERIOD_DEFAULT, retentionPeriod); //
for enforcing the minimum retention period of 30 days
+ }
+
+ private Set<String> getProcessTypes() {
+ Set<String> processTypes = new HashSet<>();
+
+ if (atlasProperties != null) {
+ String[] eligibleTypes =
atlasProperties.getStringArray(SOFT_DELETE_ENABLED_PROCESS_TYPES); // e.g.
hive, spark
+ for (String type : eligibleTypes) {
+ if (typeRegistry.isRegisteredType(type)) {
+ processTypes.add(type);
+ }
+ }
+ }
+
+ return processTypes;
+ }
+
+ public Set<String> getEntityTypes() {
+ Set<String> entityTypes = new HashSet<>();
+
+ if (atlasProperties != null) {
+ String[] eligibleServiceTypes =
atlasProperties.getStringArray(PURGE_ENABLED_SERVICE_TYPES); // e.g. hive, spark
+ Set<String> serviceTypes =
Arrays.stream(eligibleServiceTypes).collect(Collectors.toSet());
+
+ for (AtlasEntityDef entityDef : typeRegistry.getAllEntityDefs()) {
+ if (serviceTypes.contains(entityDef.getServiceType())) {
+ entityTypes.add(entityDef.getName());
+ }
+ }
+ }
+
+ return entityTypes;
+ }
+
+ private int getPurgeBatchSize() {
+ if (atlasProperties != null) {
+ return atlasProperties.getInt(PURGE_BATCH_SIZE,
DEFAULT_PURGE_BATCH_SIZE);
+ }
+ return DEFAULT_PURGE_BATCH_SIZE;
+ }
+
+ private int getPurgeWorkersCount() {
+ if (atlasProperties != null) {
+ return atlasProperties.getInt(PURGE_WORKERS_COUNT,
DEFAULT_PURGE_WORKERS_COUNT);
+ }
+ return DEFAULT_PURGE_WORKERS_COUNT;
+ }
+
+ private int getCleanUpWorkersCount() {
+ if (atlasProperties != null) {
+ return atlasProperties.getInt(CLEANUP_WORKERS_COUNT,
DEFAULT_CLEANUP_WORKERS_COUNT);
+ }
+ return DEFAULT_CLEANUP_WORKERS_COUNT;
+ }
+
+ private int getPurgeWorkerBatchSize() {
+ if (atlasProperties != null) {
+ return atlasProperties.getInt(PURGE_WORKER_BATCH_SIZE,
DEFAULT_PURGE_WORKER_BATCH_SIZE);
+ }
+ return DEFAULT_PURGE_WORKER_BATCH_SIZE;
+ }
+
+ private int getCleanupWorkerBatchSize() {
+ if (atlasProperties != null) {
+ return atlasProperties.getInt(CLEANUP_WORKER_BATCH_SIZE,
DEFAULT_CLEANUP_WORKER_BATCH_SIZE);
+ }
+ return DEFAULT_CLEANUP_WORKER_BATCH_SIZE;
+ }
+}
diff --git
a/repository/src/test/java/org/apache/atlas/services/PurgeServiceTest.java
b/repository/src/test/java/org/apache/atlas/services/PurgeServiceTest.java
new file mode 100644
index 000000000..0f4e0482c
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/services/PurgeServiceTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.services;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.DeleteType;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.AtlasTestBase;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasElement;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.utils.TestLoadModelUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+@Guice(modules = org.apache.atlas.TestModules.TestOnlyModule.class)
+public class PurgeServiceTest extends AtlasTestBase {
+ @Inject
+ private AtlasTypeDefStore typeDefStore;
+
+ @Inject
+ private AtlasTypeRegistry typeRegistry;
+
+ @Inject
+ private AtlasEntityStoreV2 entityStore;
+
+ @Inject
+ private AtlasGraph atlasGraph;
+
+ @BeforeClass
+ public void setup() throws Exception {
+ RequestContext.clear();
+ super.initialize();
+ TestLoadModelUtils.loadBaseModel(typeDefStore, typeRegistry);
+ TestLoadModelUtils.loadHiveModel(typeDefStore, typeRegistry);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignored) { }
+ }
+
+ @Test
+ public void testPurgeEntities() throws Exception {
+ // Approach (pre-commit check on async flow):
+ // - Spy store's IAtlasEntityChangeNotifier and capture
onEntitiesMutated(response,false) to assert
+ // the purged GUID before commit/locking paths.
+ // Create DB and table
+ AtlasEntity db = newHiveDb(null);
+ String dbGuid = persistAndGetGuid(db);
+ AtlasEntity tbl = newHiveTable(db, null);
+ String tblGuid = persistAndGetGuid(tbl);
+
+ // Soft-delete table
+ EntityMutationResponse del =
entityStore.deleteByIds(Collections.singletonList(tblGuid));
+ assertNotNull(del);
+
+ // Backdate timestamp beyond default 30d retention and reindex to
reflect in Solr
+ backdateModificationTimestamp(tblGuid, 31);
+ reindexVertices(tblGuid);
+ pauseForIndexCreation();
+
+ // Enable hive purge and single worker for determinism
+
ApplicationProperties.get().setProperty("atlas.purge.enabled.services", "hive");
+ ApplicationProperties.get().setProperty("atlas.purge.workers.count",
"1");
+
ApplicationProperties.get().setProperty("atlas.purge.worker.batch.size", "1");
+
+ // Clear context to avoid skipping due to prior delete tracking
+ RequestContext.clear();
+
+ // Inject notifier spy to capture pre-commit signal (pre-commit
verification point)
+ Object originalNotifier = injectNotifierSpy(entityStore);
+
+ new PurgeService(atlasGraph, entityStore,
typeRegistry).purgeEntities();
+
+ // Verify notifier call and capture response before transaction commit
(async-safe via timeout)
+ IAtlasEntityChangeNotifier spy = (IAtlasEntityChangeNotifier)
getNotifier(entityStore);
+ ArgumentCaptor<EntityMutationResponse> cap =
ArgumentCaptor.forClass(EntityMutationResponse.class);
+ Mockito.verify(spy,
Mockito.timeout(5000)).onEntitiesMutated(cap.capture(), Mockito.eq(false));
+
+ EntityMutationResponse notified = cap.getValue();
+ assertNotNull(notified);
+ List<AtlasEntityHeader> purged = notified.getPurgedEntities();
+ assertNotNull(purged);
+
+ assertTrue(purged.stream().anyMatch(h -> tblGuid.equals(h.getGuid())));
+
+ // Restore original notifier to leave the store in a clean state
+ restoreNotifier(entityStore, originalNotifier);
+
+ // Flag assertions
+ assertTrue(RequestContext.get().isPurgeRequested());
+ assertEquals(RequestContext.get().getDeleteType(), DeleteType.HARD);
+ }
+
+ private AtlasEntity newHiveDb(String nameOpt) {
+ String name = nameOpt != null ? nameOpt :
RandomStringUtils.randomAlphanumeric(10);
+ AtlasEntity db = new AtlasEntity("hive_db");
+ db.setAttribute("name", name);
+ db.setAttribute("qualifiedName", name);
+ db.setAttribute("clusterName", "cl1");
+ db.setAttribute("location", "/tmp");
+ db.setAttribute("description", "test db");
+ return db;
+ }
+
+ private AtlasEntity newHiveTable(AtlasEntity db, String nameOpt) {
+ String name = nameOpt != null ? nameOpt :
RandomStringUtils.randomAlphanumeric(10);
+ AtlasEntity tbl = new AtlasEntity("hive_table");
+ tbl.setAttribute("name", name);
+ tbl.setAttribute("qualifiedName", name);
+ tbl.setAttribute("description", "random table");
+ tbl.setAttribute("type", "type");
+ tbl.setAttribute("tableType", "MANAGED");
+ tbl.setAttribute("db", AtlasTypeUtil.getAtlasObjectId(db));
+ return tbl;
+ }
+
+ private String persistAndGetGuid(AtlasEntity entity) throws
AtlasBaseException {
+ EntityMutationResponse resp = entityStore.createOrUpdate(new
AtlasEntityStream(new AtlasEntityWithExtInfo(entity)), false);
+ String typeName = entity.getTypeName();
+ AtlasEntityHeader hdr = resp.getFirstCreatedEntityByTypeName(typeName);
+ String guid = hdr != null ? hdr.getGuid() : null;
+ return guid;
+ }
+
+ private void backdateModificationTimestamp(String guid, int days) {
+ AtlasVertex v = AtlasGraphUtilsV2.findByGuid(atlasGraph, guid);
+ if (v != null) {
+ long delta = days * 24L * 60 * 60 * 1000;
+ long ts = System.currentTimeMillis() - delta;
+ AtlasGraphUtilsV2.setProperty(v,
Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, ts);
+ }
+ }
+
+ private void reindexVertices(String... guids) {
+ List<AtlasElement> elements = new ArrayList<>();
+ for (String g : guids) {
+ if (g == null) {
+ continue;
+ }
+ AtlasVertex v = AtlasGraphUtilsV2.findByGuid(atlasGraph, g);
+ if (v != null) {
+ elements.add(v);
+ }
+ }
+ if (!elements.isEmpty()) {
+ try {
+
atlasGraph.getManagementSystem().reindex(Constants.VERTEX_INDEX, elements);
+
atlasGraph.getManagementSystem().reindex(Constants.FULLTEXT_INDEX, elements);
+ } catch (Exception ignored) { }
+ }
+ }
+
+ // Test helper: swap store's private notifier field with a Mockito spy so
we can
+ // capture and assert the pre-commit mutation response invoked by the
store.
+ private Object injectNotifierSpy(AtlasEntityStoreV2 storeV2) throws
Exception {
+ Field f =
AtlasEntityStoreV2.class.getDeclaredField("entityChangeNotifier");
+ f.setAccessible(true);
+ Object original = f.get(storeV2);
+ Object spy = Mockito.spy(original);
+ f.set(storeV2, spy);
+ return original;
+ }
+
+ // Test helper: fetch the (spied) notifier instance currently installed on
the store.
+ private Object getNotifier(AtlasEntityStoreV2 storeV2) throws Exception {
+ Field f =
AtlasEntityStoreV2.class.getDeclaredField("entityChangeNotifier");
+ f.setAccessible(true);
+ return f.get(storeV2);
+ }
+
+ // Test helper: restore the original notifier instance after verification.
+ private void restoreNotifier(AtlasEntityStoreV2 storeV2, Object original)
throws Exception {
+ Field f =
AtlasEntityStoreV2.class.getDeclaredField("entityChangeNotifier");
+ f.setAccessible(true);
+ f.set(storeV2, original);
+ }
+}
diff --git
a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index 06ff05339..7064cc0da 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -68,6 +68,7 @@ import org.apache.atlas.repository.impexp.ZipSink;
import org.apache.atlas.repository.patches.AtlasPatchManager;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.services.MetricsService;
+import org.apache.atlas.services.PurgeService;
import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
@@ -121,6 +122,7 @@ import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -162,6 +164,10 @@ public class AdminResource {
private static final List<String> TIMEZONE_LIST =
Arrays.asList(TimeZone.getAvailableIDs());
private static final String METRICS_PERSIST_INTERVAL =
"atlas.metrics.persist.schedule";
private static final String METRICS_PERSIST_INTERVAL_DEFAULT = "0 0
0/1 * * *"; // 1 hour interval
+ private static final String PURGE_CRON_EXPRESSION =
"atlas.purge.cron.expression";
+ private static final String PURGE_CRON_EXPRESSION_DEFAULT = "* *
* 30 2 ?"; // disabled by default, user controlled scheduling only
+ private static final String ACTIVE =
"ACTIVE";
+ private static final String PURGE_THREAD_NAME =
"Scheduled-Purge-Thread";
private static final Configuration atlasProperties;
private final ServiceState serviceState;
@@ -192,6 +198,8 @@ public class AdminResource {
private final boolean isUiTasksTabEnabled;
private final AtlasAuditReductionService auditReductionService;
private Response version;
+ private final PurgeService purgeService;
+ private final ReentrantLock cronPurgeOperationLock;
@Context
private HttpServletRequest httpServletRequest;
@@ -205,7 +213,8 @@ public class AdminResource {
MigrationProgressService migrationProgressService,
AtlasServerService serverService,
ExportImportAuditService exportImportAuditService,
AtlasEntityStore entityStore,
AtlasPatchManager patchManager, AtlasAuditService auditService,
EntityAuditRepository auditRepository,
- TaskManagement taskManagement, AtlasDebugMetricsSink
debugMetricsRESTSink, AtlasAuditReductionService atlasAuditReductionService,
AtlasMetricsUtil atlasMetricsUtil) {
+ TaskManagement taskManagement, AtlasDebugMetricsSink
debugMetricsRESTSink, AtlasAuditReductionService atlasAuditReductionService,
AtlasMetricsUtil atlasMetricsUtil,
+ PurgeService purgeService) {
this.serviceState = serviceState;
this.metricsService = metricsService;
this.exportService = exportService;
@@ -224,6 +233,8 @@ public class AdminResource {
this.debugMetricsRESTSink = debugMetricsRESTSink;
this.auditReductionService = atlasAuditReductionService;
this.atlasMetricsUtil = atlasMetricsUtil;
+ this.purgeService = purgeService;
+ this.cronPurgeOperationLock = new ReentrantLock();
if (atlasProperties != null) {
this.defaultUIVersion =
atlasProperties.getString(DEFAULT_UI_VERSION, UI_VERSION_V3);
@@ -804,6 +815,63 @@ public class AdminResource {
}
}
+ @Scheduled(cron = "#{getPurgeCronExpression}")
+ public void schedulePurgeEntities() throws AtlasBaseException {
+ try {
+ Thread.currentThread().setName(PURGE_THREAD_NAME);
+ if (acquireCronPurgeOperationLock()) {
+ String state = serviceState.getState().toString();
+ LOG.info("==> Status of current node is {}", state);
+ if (state.equals(ACTIVE)) {
+ LOG.info("==> Scheduled Purging has started");
+ EntityMutationResponse entityMutationResponse =
purgeService.purgeEntities();
+ Set<String> guids = new HashSet<>();
+
+ final List<AtlasEntityHeader> purgedEntities =
entityMutationResponse.getPurgedEntities() != null
+ ? entityMutationResponse.getPurgedEntities()
+ : Collections.emptyList();
+
+ if (CollectionUtils.isEmpty(purgedEntities)) {
+ LOG.info("==> no entities got purged");
+ return;
+ }
+
+ for (AtlasEntityHeader entityHeader :
entityMutationResponse.getPurgedEntities()) {
+ guids.add(entityHeader.getGuid());
+ }
+
+ LOG.info("==> Purged Entities {}", purgedEntities.size());
+
+ auditService.add(AuditOperation.AUTO_PURGE,
guids.toString(), entityMutationResponse.getPurgedEntitiesIds(),
+ entityMutationResponse.getPurgedEntities().size());
+
+ LOG.info("==> Scheduled Purging has finished");
+ } else {
+ LOG.info("==> Current node is not active, so skipping the
scheduled purge");
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Error while purging entities", e);
+ throw new AtlasBaseException(e);
+ } finally {
+ RequestContext.clear();
+ LOG.info("==> clearing the context");
+ cronPurgeOperationLock.unlock();
+ }
+ }
+
+ @Bean
+ private String getPurgeCronExpression() {
+ if (atlasProperties != null) {
+ return atlasProperties.getString(PURGE_CRON_EXPRESSION,
PURGE_CRON_EXPRESSION_DEFAULT);
+ }
+ return PURGE_CRON_EXPRESSION_DEFAULT;
+ }
+
+ private boolean acquireCronPurgeOperationLock() {
+ return cronPurgeOperationLock.tryLock();
+ }
+
@POST
@Path("/importfile")
@Produces(Servlets.JSON_MEDIA_TYPE)
diff --git
a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
index f7e2c7ef7..aac34993e 100644
--- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
@@ -48,7 +48,7 @@ public class AdminResourceTest {
public void testStatusOfActiveServerIsReturned() throws IOException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
- AdminResource adminResource = new AdminResource(serviceState, null,
null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null);
+ AdminResource adminResource = new AdminResource(serviceState, null,
null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null);
Response response = adminResource.getStatus();
assertEquals(response.getStatus(), HttpServletResponse.SC_OK);
@@ -62,7 +62,7 @@ public class AdminResourceTest {
public void testResourceGetsValueFromServiceState() throws IOException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
- AdminResource adminResource = new AdminResource(serviceState, null,
null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null);
+ AdminResource adminResource = new AdminResource(serviceState, null,
null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null);
Response response = adminResource.getStatus();
verify(serviceState).getState();