This is an automated email from the ASF dual-hosted git repository.

sidmishra pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 5388730  ATLAS-4499: Classification deletion on entity should check 
for pending tasks for that entity and classification
5388730 is described below

commit 5388730217ae61cee1be75cc11b5dc908edf4209
Author: Radhika Kundam <[email protected]>
AuthorDate: Thu Feb 3 15:54:14 2022 -0800

    ATLAS-4499: Classification deletion on entity should check for pending 
tasks for that entity and classification
    
    Signed-off-by: Sidharth Mishra <[email protected]>
    (cherry picked from commit 1c0ab9c93705bd7c302f1404794d0e798d1e9582)
---
 .../main/java/org/apache/atlas/AtlasErrorCode.java |  2 ++
 .../store/graph/v2/EntityGraphMapper.java          | 22 ++++++++++++++++++++--
 .../store/graph/v2/tasks/ClassificationTask.java   | 10 +++++-----
 .../org/apache/atlas/tasks/TaskManagement.java     |  4 ++++
 .../ClassificationPropagationWithTasksTest.java    |  8 +++++++-
 5 files changed, 38 insertions(+), 8 deletions(-)

diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java 
b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index c7e6f3a..6ca933f 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -201,6 +201,8 @@ public enum AtlasErrorCode {
 
     METHOD_NOT_ALLOWED(405, "ATLAS-405-00-001", "Error 405 - The request 
method {0} is inappropriate for the URL: {1}"),
 
+    DELETE_TAG_PROPAGATION_NOT_ALLOWED(406, "ATLAS-406-00-001", 
"Classification delete is not allowed; Add/Update classification propagation is 
in progress for classification: {0} and entity: {1}. Please try again"),
+
     // All data conflict errors go here
     TYPE_ALREADY_EXISTS(409, "ATLAS-409-00-001", "Given type {0} already 
exists"),
     TYPE_HAS_REFERENCES(409, "ATLAS-409-00-002", "Given type {0} has 
references"),
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index f6c50bc..9a7f290 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -37,7 +37,8 @@ import org.apache.atlas.model.instance.AtlasRelationship;
 import org.apache.atlas.model.instance.AtlasStruct;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
-import org.apache.atlas.model.typedef.AtlasEntityDef;
+ import org.apache.atlas.model.tasks.AtlasTask;
+ import org.apache.atlas.model.typedef.AtlasEntityDef;
 import 
org.apache.atlas.model.typedef.AtlasEntityDef.AtlasRelationshipAttributeDef;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
 import 
org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality;
@@ -53,6 +54,7 @@ import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
 import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
 import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
+ import org.apache.atlas.repository.store.graph.v2.tasks.ClassificationTask;
  import org.apache.atlas.tasks.TaskManagement;
  import org.apache.atlas.type.AtlasArrayType;
 import org.apache.atlas.type.AtlasBuiltInTypes;
@@ -2180,7 +2182,18 @@ public class EntityGraphMapper {
 
         if (isPropagationEnabled(classificationVertex)) {
             if (taskManagement != null && DEFERRED_ACTION_ENABLED) {
-                createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE, 
entityVertex, classificationVertex.getIdForDisplay());
+                String classificationVertexId = 
classificationVertex.getIdForDisplay();
+
+                // Create new task only if no pending tasks exists for same 
classification and entity
+                boolean pendingTaskExists  = 
taskManagement.getPendingTasks().stream()
+                        .anyMatch(x -> classificationHasPendingTask(x, 
classificationVertexId, entityGuid));
+
+                if (pendingTaskExists) {
+                    LOG.error("Another tag propagation is in progress for 
classification: {} and entity: {}. Please try again", classificationVertexId, 
entityGuid);
+                    throw new 
AtlasBaseException(AtlasErrorCode.DELETE_TAG_PROPAGATION_NOT_ALLOWED, 
classificationVertexId, entityGuid);
+                }
+
+                createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE, 
entityVertex, classificationVertexId);
 
                 entityVertices = new ArrayList<>();
             } else {
@@ -2230,6 +2243,11 @@ public class EntityGraphMapper {
         AtlasPerfTracer.log(perf);
     }
 
+    private boolean classificationHasPendingTask(AtlasTask task, String 
classificationVertexId, String entityGuid) {
+        return 
task.getParameters().get(ClassificationTask.PARAM_CLASSIFICATION_VERTEX_ID).equals(classificationVertexId)
+                && 
task.getParameters().get(ClassificationTask.PARAM_ENTITY_GUID).equals(entityGuid);
+    }
+
     private AtlasEntity updateClassificationText(AtlasVertex vertex) throws 
AtlasBaseException {
         String guid        = graphHelper.getGuid(vertex);
         AtlasEntity entity = instanceConverter.getAndCacheEntity(guid, 
ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
index 00c9caa..0bad84e 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
@@ -47,11 +47,11 @@ import static 
org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY;
 public abstract class ClassificationTask extends AbstractTask {
     private static final Logger LOG = 
LoggerFactory.getLogger(ClassificationTask.class);
 
-    protected static final String PARAM_ENTITY_GUID              = 
"entityGuid";
-    protected static final String PARAM_CLASSIFICATION_VERTEX_ID = 
"classificationVertexId";
-    protected static final String PARAM_RELATIONSHIP_GUID        = 
"relationshipGuid";
-    protected static final String PARAM_RELATIONSHIP_OBJECT      = 
"relationshipObject";
-    protected static final String PARAM_RELATIONSHIP_EDGE_ID     = 
"relationshipEdgeId";
+    public static final String PARAM_ENTITY_GUID              = "entityGuid";
+    public static final String PARAM_CLASSIFICATION_VERTEX_ID = 
"classificationVertexId";
+    public static final String PARAM_RELATIONSHIP_GUID        = 
"relationshipGuid";
+    public static final String PARAM_RELATIONSHIP_OBJECT      = 
"relationshipObject";
+    public static final String PARAM_RELATIONSHIP_EDGE_ID     = 
"relationshipEdgeId";
 
     protected final AtlasGraph             graph;
     protected final EntityGraphMapper      entityGraphMapper;
diff --git 
a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java 
b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
index 97b9980..5b4bf71 100644
--- a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
+++ b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
@@ -116,6 +116,10 @@ public class TaskManagement implements Service, 
ActiveStateChangeHandler {
         return this.registry.createVertex(taskType, createdBy, parameters);
     }
 
+    public List<AtlasTask> getPendingTasks() {
+        return this.registry.getPendingTasks();
+    }
+
     public List<AtlasTask> getAll() {
         return this.registry.getAll();
     }
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
index d440f2f..15c7159 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.atlas.repository.tagpropagation;
 
+import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.TestModules;
 import org.apache.atlas.exception.AtlasBaseException;
@@ -51,6 +52,7 @@ import java.util.List;
 
 import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
 import static org.apache.atlas.utils.TestLoadModelUtils.loadModelFromJson;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -204,7 +206,11 @@ public class ClassificationPropagationWithTasksTest 
extends AtlasTestBase {
         AtlasVertex entityVertex = 
AtlasGraphUtilsV2.findByGuid(hdfs_employees.getGuid());
         AtlasVertex classificationVertex = 
GraphHelper.getClassificationVertex(entityVertex, TAG_NAME);
 
-        entityStore.deleteClassification(HDFS_PATH_EMPLOYEES, 
tagX.getTypeName());
+        try {
+            entityStore.deleteClassification(HDFS_PATH_EMPLOYEES, 
tagX.getTypeName());
+        } catch (AtlasBaseException e) {
+            assertEquals(e.getAtlasErrorCode(), 
AtlasErrorCode.DELETE_TAG_PROPAGATION_NOT_ALLOWED);
+        }
 
         assertNotNull(entityVertex);
         assertNotNull(classificationVertex);

Reply via email to