This is an automated email from the ASF dual-hosted git repository.
sidmishra pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 5388730 ATLAS-4499: Classification deletion on entity should check
for pending tasks for that entity and classification
5388730 is described below
commit 5388730217ae61cee1be75cc11b5dc908edf4209
Author: Radhika Kundam <[email protected]>
AuthorDate: Thu Feb 3 15:54:14 2022 -0800
ATLAS-4499: Classification deletion on entity should check for pending
tasks for that entity and classification
Signed-off-by: Sidharth Mishra <[email protected]>
(cherry picked from commit 1c0ab9c93705bd7c302f1404794d0e798d1e9582)
---
.../main/java/org/apache/atlas/AtlasErrorCode.java | 2 ++
.../store/graph/v2/EntityGraphMapper.java | 22 ++++++++++++++++++++--
.../store/graph/v2/tasks/ClassificationTask.java | 10 +++++-----
.../org/apache/atlas/tasks/TaskManagement.java | 4 ++++
.../ClassificationPropagationWithTasksTest.java | 8 +++++++-
5 files changed, 38 insertions(+), 8 deletions(-)
diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index c7e6f3a..6ca933f 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -201,6 +201,8 @@ public enum AtlasErrorCode {
METHOD_NOT_ALLOWED(405, "ATLAS-405-00-001", "Error 405 - The request
method {0} is inappropriate for the URL: {1}"),
+ DELETE_TAG_PROPAGATION_NOT_ALLOWED(406, "ATLAS-406-00-001",
"Classification delete is not allowed; Add/Update classification propagation is
in progress for classification: {0} and entity: {1}. Please try again"),
+
// All data conflict errors go here
TYPE_ALREADY_EXISTS(409, "ATLAS-409-00-001", "Given type {0} already
exists"),
TYPE_HAS_REFERENCES(409, "ATLAS-409-00-002", "Given type {0} has
references"),
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index f6c50bc..9a7f290 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -37,7 +37,8 @@ import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
-import org.apache.atlas.model.typedef.AtlasEntityDef;
+ import org.apache.atlas.model.tasks.AtlasTask;
+ import org.apache.atlas.model.typedef.AtlasEntityDef;
import
org.apache.atlas.model.typedef.AtlasEntityDef.AtlasRelationshipAttributeDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import
org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality;
@@ -53,6 +54,7 @@ import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
+ import org.apache.atlas.repository.store.graph.v2.tasks.ClassificationTask;
import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBuiltInTypes;
@@ -2180,7 +2182,18 @@ public class EntityGraphMapper {
if (isPropagationEnabled(classificationVertex)) {
if (taskManagement != null && DEFERRED_ACTION_ENABLED) {
- createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE,
entityVertex, classificationVertex.getIdForDisplay());
+ String classificationVertexId =
classificationVertex.getIdForDisplay();
+
+ // Create new task only if no pending tasks exists for same
classification and entity
+ boolean pendingTaskExists =
taskManagement.getPendingTasks().stream()
+ .anyMatch(x -> classificationHasPendingTask(x,
classificationVertexId, entityGuid));
+
+ if (pendingTaskExists) {
+ LOG.error("Another tag propagation is in progress for
classification: {} and entity: {}. Please try again", classificationVertexId,
entityGuid);
+ throw new
AtlasBaseException(AtlasErrorCode.DELETE_TAG_PROPAGATION_NOT_ALLOWED,
classificationVertexId, entityGuid);
+ }
+
+ createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE,
entityVertex, classificationVertexId);
entityVertices = new ArrayList<>();
} else {
@@ -2230,6 +2243,11 @@ public class EntityGraphMapper {
AtlasPerfTracer.log(perf);
}
+ private boolean classificationHasPendingTask(AtlasTask task, String
classificationVertexId, String entityGuid) {
+ return
task.getParameters().get(ClassificationTask.PARAM_CLASSIFICATION_VERTEX_ID).equals(classificationVertexId)
+ &&
task.getParameters().get(ClassificationTask.PARAM_ENTITY_GUID).equals(entityGuid);
+ }
+
private AtlasEntity updateClassificationText(AtlasVertex vertex) throws
AtlasBaseException {
String guid = graphHelper.getGuid(vertex);
AtlasEntity entity = instanceConverter.getAndCacheEntity(guid,
ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
index 00c9caa..0bad84e 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
@@ -47,11 +47,11 @@ import static
org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY;
public abstract class ClassificationTask extends AbstractTask {
private static final Logger LOG =
LoggerFactory.getLogger(ClassificationTask.class);
- protected static final String PARAM_ENTITY_GUID =
"entityGuid";
- protected static final String PARAM_CLASSIFICATION_VERTEX_ID =
"classificationVertexId";
- protected static final String PARAM_RELATIONSHIP_GUID =
"relationshipGuid";
- protected static final String PARAM_RELATIONSHIP_OBJECT =
"relationshipObject";
- protected static final String PARAM_RELATIONSHIP_EDGE_ID =
"relationshipEdgeId";
+ public static final String PARAM_ENTITY_GUID = "entityGuid";
+ public static final String PARAM_CLASSIFICATION_VERTEX_ID =
"classificationVertexId";
+ public static final String PARAM_RELATIONSHIP_GUID =
"relationshipGuid";
+ public static final String PARAM_RELATIONSHIP_OBJECT =
"relationshipObject";
+ public static final String PARAM_RELATIONSHIP_EDGE_ID =
"relationshipEdgeId";
protected final AtlasGraph graph;
protected final EntityGraphMapper entityGraphMapper;
diff --git
a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
index 97b9980..5b4bf71 100644
--- a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
+++ b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
@@ -116,6 +116,10 @@ public class TaskManagement implements Service,
ActiveStateChangeHandler {
return this.registry.createVertex(taskType, createdBy, parameters);
}
+ public List<AtlasTask> getPendingTasks() {
+ return this.registry.getPendingTasks();
+ }
+
public List<AtlasTask> getAll() {
return this.registry.getAll();
}
diff --git
a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
index d440f2f..15c7159 100644
---
a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
+++
b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.atlas.repository.tagpropagation;
+import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
@@ -51,6 +52,7 @@ import java.util.List;
import static
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
import static org.apache.atlas.utils.TestLoadModelUtils.loadModelFromJson;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -204,7 +206,11 @@ public class ClassificationPropagationWithTasksTest
extends AtlasTestBase {
AtlasVertex entityVertex =
AtlasGraphUtilsV2.findByGuid(hdfs_employees.getGuid());
AtlasVertex classificationVertex =
GraphHelper.getClassificationVertex(entityVertex, TAG_NAME);
- entityStore.deleteClassification(HDFS_PATH_EMPLOYEES,
tagX.getTypeName());
+ try {
+ entityStore.deleteClassification(HDFS_PATH_EMPLOYEES,
tagX.getTypeName());
+ } catch (AtlasBaseException e) {
+ assertEquals(e.getAtlasErrorCode(),
AtlasErrorCode.DELETE_TAG_PROPAGATION_NOT_ALLOWED);
+ }
assertNotNull(entityVertex);
assertNotNull(classificationVertex);