This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch atlas-2.5
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 7feb60a11549b53d8ba9c639abdd2dac06ce6c90
Author: chaitalicod <[email protected]>
AuthorDate: Mon Sep 22 21:13:28 2025 +0530

    ATLAS-5055: Incremental Export : When entity exported has a tag propagated 
from entity which is deleted , tag is not propagated to it at target (#434)
    
    Co-authored-by: chaitalithombare <[email protected]>
    (cherry picked from commit 0d00b7303e87aec67c56955fc34dc72ccf5ca665)
---
 .../repository/store/graph/v1/DeleteHandlerV1.java |  10 ++++---
 .../store/graph/v2/EntityGraphMapper.java          |  13 +++++----
 .../store/graph/v2/EntityGraphRetriever.java       |   4 +--
 .../store/graph/v2/tasks/ClassificationTask.java   |  15 +++++++++++
 .../ClassificationPropagationWithTasksTest.java    |  30 +++++++++++++++++++++
 .../src/test/resources/deleted_tab_propagation.zip | Bin 0 -> 26200 bytes
 6 files changed, 60 insertions(+), 12 deletions(-)

diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
index 5d83f2934..378fd762a 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
@@ -957,10 +957,10 @@ public abstract class DeleteHandlerV1 {
         }
     }
 
-    public void createAndQueueTask(String taskType, AtlasVertex entityVertex, 
String classificationVertexId, String relationshipGuid, String 
classificationName) {
+    public void createAndQueueTask(String taskType, AtlasVertex entityVertex, 
String classificationVertexId, String relationshipGuid, String 
classificationName, boolean isImportInProgress) {
         String              currentUser = RequestContext.getCurrentUser();
         String              entityGuid  = GraphHelper.getGuid(entityVertex);
-        Map<String, Object> taskParams  = 
ClassificationTask.toParameters(entityGuid, classificationVertexId, 
relationshipGuid, classificationName);
+        Map<String, Object> taskParams  = 
ClassificationTask.toParameters(entityGuid, classificationVertexId, 
relationshipGuid, classificationName, isImportInProgress);
         AtlasTask           task        = taskManagement.createTask(taskType, 
currentUser, taskParams);
 
         AtlasGraphUtilsV2.addEncodedProperty(entityVertex, 
PENDING_TASKS_PROPERTY_KEY, task.getGuid());
@@ -1280,10 +1280,11 @@ public abstract class DeleteHandlerV1 {
     private void addTagPropagation(AtlasVertex fromVertex, AtlasVertex 
toVertex, AtlasEdge edge) throws AtlasBaseException {
         final List<AtlasVertex> classificationVertices = 
getPropagationEnabledClassificationVertices(fromVertex);
         String                  relationshipGuid       = 
getRelationshipGuid(edge);
+        boolean isImportInProgress                     = 
RequestContext.get().isImportInProgress();
 
         if (taskManagement != null && DEFERRED_ACTION_ENABLED) {
             for (AtlasVertex classificationVertex : classificationVertices) {
-                createAndQueueTask(CLASSIFICATION_PROPAGATION_ADD, toVertex, 
classificationVertex.getIdForDisplay(), relationshipGuid, 
GraphHelper.getClassificationName(classificationVertex));
+                createAndQueueTask(CLASSIFICATION_PROPAGATION_ADD, toVertex, 
classificationVertex.getIdForDisplay(), relationshipGuid, 
GraphHelper.getClassificationName(classificationVertex), isImportInProgress);
             }
         } else {
             final List<AtlasVertex> propagatedEntityVertices = 
CollectionUtils.isNotEmpty(classificationVertices) ? 
entityRetriever.getIncludedImpactedVerticesV2(toVertex, relationshipGuid) : 
null;
@@ -1347,6 +1348,7 @@ public abstract class DeleteHandlerV1 {
      */
     private void deleteAllClassifications(AtlasVertex instanceVertex) throws 
AtlasBaseException {
         List<AtlasEdge> classificationEdges = 
getAllClassificationEdges(instanceVertex);
+        boolean isImportInProgress = RequestContext.get().isImportInProgress();
 
         for (AtlasEdge edge : classificationEdges) {
             AtlasVertex classificationVertex = edge.getInVertex();
@@ -1355,7 +1357,7 @@ public abstract class DeleteHandlerV1 {
 
             if (isClassificationEdge && removePropagations) {
                 if (taskManagement != null && DEFERRED_ACTION_ENABLED) {
-                    createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE, 
instanceVertex, classificationVertex.getIdForDisplay(), null, 
GraphHelper.getClassificationName(classificationVertex));
+                    createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE, 
instanceVertex, classificationVertex.getIdForDisplay(), null, 
GraphHelper.getClassificationName(classificationVertex), isImportInProgress);
                 } else {
                     removeTagPropagation(classificationVertex);
                 }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 02f86866f..3b7f4a510 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -792,6 +792,7 @@ public class EntityGraphMapper {
             List<AtlasVertex>                              
entitiesToPropagateTo = null;
             Map<AtlasClassification, HashSet<AtlasVertex>> 
addedClassifications  = new HashMap<>();
             List<AtlasClassification>                      addClassifications  
  = new ArrayList<>(classifications.size());
+            boolean isImportInProgress                                         
  = RequestContext.get().isImportInProgress();
 
             for (AtlasClassification c : classifications) {
                 AtlasClassification classification     = new 
AtlasClassification(c);
@@ -851,7 +852,7 @@ public class EntityGraphMapper {
                 if (propagateTags && taskManagement != null && 
deferredActionEnabled) {
                     propagateTags = false;
 
-                    createAndQueueTask(CLASSIFICATION_PROPAGATION_ADD, 
entityVertex, classificationVertex.getIdForDisplay(), classificationName);
+                    createAndQueueTask(CLASSIFICATION_PROPAGATION_ADD, 
entityVertex, classificationVertex.getIdForDisplay(), classificationName, 
isImportInProgress);
                 }
 
                 // add the attributes for the trait instance
@@ -981,6 +982,7 @@ public class EntityGraphMapper {
         }
 
         AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(this.graph, 
entityGuid);
+        boolean isImportInProgress = RequestContext.get().isImportInProgress();
 
         if (entityVertex == null) {
             throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, entityGuid);
@@ -1023,7 +1025,7 @@ public class EntityGraphMapper {
                     throw new 
AtlasBaseException(AtlasErrorCode.DELETE_TAG_PROPAGATION_NOT_ALLOWED, 
classificationVertexId, entityGuid);
                 }
 
-                createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE, 
entityVertex, classificationVertexId, classificationName);
+                createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE, 
entityVertex, classificationVertexId, classificationName, isImportInProgress);
 
                 entityVertices = new ArrayList<>();
             } else {
@@ -1101,6 +1103,7 @@ public class EntityGraphMapper {
         }
 
         AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(this.graph, 
guid);
+        boolean isImportInProgress = RequestContext.get().isImportInProgress();
 
         if (entityVertex == null) {
             throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
@@ -1215,7 +1218,7 @@ public class EntityGraphMapper {
             if (updatedTagPropagation != null && taskManagement != null && 
deferredActionEnabled) {
                 String propagationType = updatedTagPropagation ? 
CLASSIFICATION_PROPAGATION_ADD : CLASSIFICATION_PROPAGATION_DELETE;
 
-                createAndQueueTask(propagationType, entityVertex, 
classificationVertex.getIdForDisplay(), classificationName);
+                createAndQueueTask(propagationType, entityVertex, 
classificationVertex.getIdForDisplay(), classificationName, isImportInProgress);
 
                 updatedTagPropagation = null;
             }
@@ -2834,7 +2837,7 @@ public class EntityGraphMapper {
         attributes.put(bmAttribute.getName(), attrValue);
     }
 
-    private void createAndQueueTask(String taskType, AtlasVertex entityVertex, 
String classificationVertexId, String classificationName) {
-        deleteDelegate.getHandler().createAndQueueTask(taskType, entityVertex, 
classificationVertexId, null, classificationName);
+    private void createAndQueueTask(String taskType, AtlasVertex entityVertex, 
String classificationVertexId, String classificationName,  boolean 
isImportInProgress) {
+        deleteDelegate.getHandler().createAndQueueTask(taskType, entityVertex, 
classificationVertexId, null, classificationName, isImportInProgress);
     }
 }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
index 765aa7ddc..1043589b1 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
@@ -18,7 +18,6 @@
 package org.apache.atlas.repository.store.graph.v2;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.exception.AtlasBaseException;
@@ -164,7 +163,6 @@ public class EntityGraphRetriever {
     private static final String GLOSSARY_CATEGORY_HIERARCHY_EDGE_LABEL = 
"r:AtlasGlossaryCategoryHierarchyLink";
     private static final String GLOSSARY_CATEGORY_TYPE_NAME            = 
AtlasGlossaryCategory.class.getSimpleName();
     private static final String PARENT_GLOSSARY_CATEGORY_GUID          = 
"parentCategoryGuid";
-    private boolean             deferredActionEnabled                  = 
AtlasConfiguration.TASKS_USE_ENABLED.getBoolean();
 
     private static final TypeReference<List<TimeBoundary>> 
TIME_BOUNDARIES_LIST_TYPE = new TypeReference<List<TimeBoundary>>() {};
 
@@ -701,7 +699,7 @@ public class EntityGraphRetriever {
             Iterable<AtlasEdge> propagationEdges = 
entityVertex.getEdges(AtlasEdgeDirection.BOTH, tagPropagationEdges);
 
             for (AtlasEdge propagationEdge : propagationEdges) {
-                if (getEdgeStatus(propagationEdge) != ACTIVE && 
!deferredActionEnabled && !RequestContext.get().isImportInProgress()) {
+                if (getEdgeStatus(propagationEdge) != ACTIVE && 
!RequestContext.get().isImportInProgress()) {
                     continue;
                 }
 
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
index a614a7a84..1822581d4 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
@@ -50,6 +50,8 @@ public abstract class ClassificationTask extends AbstractTask 
{
     public static final String PARAM_RELATIONSHIP_GUID        = 
"relationshipGuid";
     public static final String PARAM_RELATIONSHIP_OBJECT      = 
"relationshipObject";
     public static final String PARAM_RELATIONSHIP_EDGE_ID     = 
"relationshipEdgeId";
+    public static final String PARAM_IMPORT_IN_PROGRESS       = 
"isImportInProgress";
+    public static final boolean DEFAULT_IMPORT_IN_PROGRESS    = false;
 
     protected final AtlasGraph             graph;
     protected final EntityGraphMapper      entityGraphMapper;
@@ -66,12 +68,17 @@ public abstract class ClassificationTask extends 
AbstractTask {
     }
 
     public static Map<String, Object> toParameters(String entityGuid, String 
classificationVertexId, String relationshipGuid, String classificationName) {
+        return toParameters(entityGuid, classificationVertexId, 
relationshipGuid, classificationName, false);
+    }
+
+    public static Map<String, Object> toParameters(String entityGuid, String 
classificationVertexId, String relationshipGuid, String classificationName, 
boolean  isImportInProgress) {
         Map<String, Object> ret = new HashMap<>();
 
         ret.put(PARAM_ENTITY_GUID, entityGuid);
         ret.put(PARAM_CLASSIFICATION_VERTEX_ID, classificationVertexId);
         ret.put(PARAM_CLASSIFICATION_NAME, classificationName);
         ret.put(PARAM_RELATIONSHIP_GUID, relationshipGuid);
+        ret.put(PARAM_IMPORT_IN_PROGRESS, isImportInProgress);
 
         return ret;
     }
@@ -119,6 +126,14 @@ public abstract class ClassificationTask extends 
AbstractTask {
             return FAILED;
         }
 
+        Object obj = params.get(PARAM_IMPORT_IN_PROGRESS);
+        if (obj != null) {
+            LOG.debug("Task: {}: Setting import progress set to: {}", 
getTaskGuid(), obj);
+            RequestContext.get().setImportInProgress((Boolean) obj);
+        }else {
+            
RequestContext.get().setImportInProgress(DEFAULT_IMPORT_IN_PROGRESS);
+        }
+
         RequestContext.get().setUser(userName, null);
 
         try {
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
index 702171cc3..2eaf6d57d 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
@@ -51,6 +51,8 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static 
org.apache.atlas.repository.graph.GraphHelper.getClassificationVertex;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getPropagatedTraitNames;
 import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
 import static org.apache.atlas.utils.TestLoadModelUtils.loadModelFromJson;
 import static org.testng.Assert.assertEquals;
@@ -63,8 +65,14 @@ import static org.testng.Assert.assertTrue;
 public class ClassificationPropagationWithTasksTest extends AtlasTestBase {
     private static final String IMPORT_FILE = "tag-propagation-data.zip";
 
+    private static final String IMPORT_DELETE_FILE = 
"deleted_tab_propagation.zip";
+
     private static final String HDFS_PATH_EMPLOYEES = 
"a3955120-ac17-426f-a4af-972ec8690e5f";
 
+    private static final String HIVE_TABLE = 
"089c1ad4-9dde-4f9e-80c8-12a3046be337";
+
+    private static final String HIVE_TABLE_CTAS = 
"e83551b7-bbef-45aa-99d5-3d98c0ac737b";
+
     @Inject
     private AtlasTypeDefStore typeDefStore;
 
@@ -211,6 +219,28 @@ public class ClassificationPropagationWithTasksTest 
extends AtlasTestBase {
         assertNotNull(impactedEntities);
     }
 
+    @Test
+    public void runImportForDeletedEntityLineage() throws Exception {
+        runImportWithNoParameters(importService, 
getZipSource(IMPORT_DELETE_FILE));
+        final String tagName = "classification1";
+
+        AtlasEntity         hiveTable = getEntity(HIVE_TABLE);
+        AtlasEntity         hiveTableCtas = getEntity(HIVE_TABLE_CTAS);
+
+        AtlasVertex parentEntityVertex   = 
AtlasGraphUtilsV2.findByGuid(hiveTable.getGuid());
+
+        AtlasVertex entityVertex         = 
AtlasGraphUtilsV2.findByGuid(hiveTableCtas.getGuid());
+
+        AtlasVertex classificationVertex = 
getClassificationVertex(parentEntityVertex, tagName);
+        assertNotNull(entityVertex);
+        assertNotNull(parentEntityVertex);
+        assertNotNull(classificationVertex);
+
+        List<String> propagatedTraitNames = 
getPropagatedTraitNames(entityVertex);
+
+        assertNotNull(propagatedTraitNames);
+    }
+
     private void loadModelFilesAndImportTestData() {
         try {
             loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, 
typeRegistry);
diff --git a/repository/src/test/resources/deleted_tab_propagation.zip 
b/repository/src/test/resources/deleted_tab_propagation.zip
new file mode 100644
index 000000000..ace129350
Binary files /dev/null and 
b/repository/src/test/resources/deleted_tab_propagation.zip differ

Reply via email to