This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch atlas-2.5 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 7feb60a11549b53d8ba9c639abdd2dac06ce6c90 Author: chaitalicod <[email protected]> AuthorDate: Mon Sep 22 21:13:28 2025 +0530 ATLAS-5055: Incremental Export : When entity exported has a tag propagated from entity which is deleted , tag is not propagated to it at target (#434) Co-authored-by: chaitalithombare <[email protected]> (cherry picked from commit 0d00b7303e87aec67c56955fc34dc72ccf5ca665) --- .../repository/store/graph/v1/DeleteHandlerV1.java | 10 ++++--- .../store/graph/v2/EntityGraphMapper.java | 13 +++++---- .../store/graph/v2/EntityGraphRetriever.java | 4 +-- .../store/graph/v2/tasks/ClassificationTask.java | 15 +++++++++++ .../ClassificationPropagationWithTasksTest.java | 30 +++++++++++++++++++++ .../src/test/resources/deleted_tab_propagation.zip | Bin 0 -> 26200 bytes 6 files changed, 60 insertions(+), 12 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java index 5d83f2934..378fd762a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java @@ -957,10 +957,10 @@ public abstract class DeleteHandlerV1 { } } - public void createAndQueueTask(String taskType, AtlasVertex entityVertex, String classificationVertexId, String relationshipGuid, String classificationName) { + public void createAndQueueTask(String taskType, AtlasVertex entityVertex, String classificationVertexId, String relationshipGuid, String classificationName, boolean isImportInProgress) { String currentUser = RequestContext.getCurrentUser(); String entityGuid = GraphHelper.getGuid(entityVertex); - Map<String, Object> taskParams = ClassificationTask.toParameters(entityGuid, classificationVertexId, relationshipGuid, classificationName); + Map<String, Object> taskParams = ClassificationTask.toParameters(entityGuid, classificationVertexId, relationshipGuid, classificationName, isImportInProgress); AtlasTask task = taskManagement.createTask(taskType, currentUser, taskParams); AtlasGraphUtilsV2.addEncodedProperty(entityVertex, PENDING_TASKS_PROPERTY_KEY, task.getGuid()); @@ -1280,10 +1280,11 @@ public abstract class DeleteHandlerV1 { private void addTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasEdge edge) throws AtlasBaseException { final List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(fromVertex); String relationshipGuid = getRelationshipGuid(edge); + boolean isImportInProgress = RequestContext.get().isImportInProgress(); if (taskManagement != null && DEFERRED_ACTION_ENABLED) { for (AtlasVertex classificationVertex : classificationVertices) { - createAndQueueTask(CLASSIFICATION_PROPAGATION_ADD, toVertex, classificationVertex.getIdForDisplay(), relationshipGuid, GraphHelper.getClassificationName(classificationVertex)); + createAndQueueTask(CLASSIFICATION_PROPAGATION_ADD, toVertex, classificationVertex.getIdForDisplay(), relationshipGuid, GraphHelper.getClassificationName(classificationVertex), isImportInProgress); } } else { final List<AtlasVertex> propagatedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? entityRetriever.getIncludedImpactedVerticesV2(toVertex, relationshipGuid) : null; @@ -1347,6 +1348,7 @@ public abstract class DeleteHandlerV1 { */ private void deleteAllClassifications(AtlasVertex instanceVertex) throws AtlasBaseException { List<AtlasEdge> classificationEdges = getAllClassificationEdges(instanceVertex); + boolean isImportInProgress = RequestContext.get().isImportInProgress(); for (AtlasEdge edge : classificationEdges) { AtlasVertex classificationVertex = edge.getInVertex(); @@ -1355,7 +1357,7 @@ public abstract class DeleteHandlerV1 { if (isClassificationEdge && removePropagations) { if (taskManagement != null && DEFERRED_ACTION_ENABLED) { - createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE, instanceVertex, classificationVertex.getIdForDisplay(), null, GraphHelper.getClassificationName(classificationVertex)); + createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE, instanceVertex, classificationVertex.getIdForDisplay(), null, GraphHelper.getClassificationName(classificationVertex), isImportInProgress); } else { removeTagPropagation(classificationVertex); } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 02f86866f..3b7f4a510 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -792,6 +792,7 @@ public class EntityGraphMapper { List<AtlasVertex> entitiesToPropagateTo = null; Map<AtlasClassification, HashSet<AtlasVertex>> addedClassifications = new HashMap<>(); List<AtlasClassification> addClassifications = new ArrayList<>(classifications.size()); + boolean isImportInProgress = RequestContext.get().isImportInProgress(); for (AtlasClassification c : classifications) { AtlasClassification classification = new AtlasClassification(c); @@ -851,7 +852,7 @@ public class EntityGraphMapper { if (propagateTags && taskManagement != null && deferredActionEnabled) { propagateTags = false; - createAndQueueTask(CLASSIFICATION_PROPAGATION_ADD, entityVertex, classificationVertex.getIdForDisplay(), classificationName); + createAndQueueTask(CLASSIFICATION_PROPAGATION_ADD, entityVertex, classificationVertex.getIdForDisplay(), classificationName, isImportInProgress); } // add the attributes for the trait instance @@ -981,6 +982,7 @@ public class EntityGraphMapper { } AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(this.graph, entityGuid); + boolean isImportInProgress = RequestContext.get().isImportInProgress(); if (entityVertex == null) { throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, entityGuid); @@ -1023,7 +1025,7 @@ public class EntityGraphMapper { throw new AtlasBaseException(AtlasErrorCode.DELETE_TAG_PROPAGATION_NOT_ALLOWED, classificationVertexId, entityGuid); } - createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE, entityVertex, classificationVertexId, classificationName); + createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE, entityVertex, classificationVertexId, classificationName, isImportInProgress); entityVertices = new ArrayList<>(); } else { @@ -1101,6 +1103,7 @@ public class EntityGraphMapper { } AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); + boolean isImportInProgress = RequestContext.get().isImportInProgress(); if (entityVertex == null) { throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); @@ -1215,7 +1218,7 @@ public class EntityGraphMapper { if (updatedTagPropagation != null && taskManagement != null && deferredActionEnabled) { String propagationType = updatedTagPropagation ? CLASSIFICATION_PROPAGATION_ADD : CLASSIFICATION_PROPAGATION_DELETE; - createAndQueueTask(propagationType, entityVertex, classificationVertex.getIdForDisplay(), classificationName); + createAndQueueTask(propagationType, entityVertex, classificationVertex.getIdForDisplay(), classificationName, isImportInProgress); updatedTagPropagation = null; } @@ -2834,7 +2837,7 @@ public class EntityGraphMapper { attributes.put(bmAttribute.getName(), attrValue); } - private void createAndQueueTask(String taskType, AtlasVertex entityVertex, String classificationVertexId, String classificationName) { - deleteDelegate.getHandler().createAndQueueTask(taskType, entityVertex, classificationVertexId, null, classificationName); + private void createAndQueueTask(String taskType, AtlasVertex entityVertex, String classificationVertexId, String classificationName, boolean isImportInProgress) { + deleteDelegate.getHandler().createAndQueueTask(taskType, entityVertex, classificationVertexId, null, classificationName, isImportInProgress); } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java index 765aa7ddc..1043589b1 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java @@ -18,7 +18,6 @@ package org.apache.atlas.repository.store.graph.v2; import com.fasterxml.jackson.core.type.TypeReference; -import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.RequestContext; import org.apache.atlas.exception.AtlasBaseException; @@ -164,7 +163,6 @@ public class EntityGraphRetriever { private static final String GLOSSARY_CATEGORY_HIERARCHY_EDGE_LABEL = "r:AtlasGlossaryCategoryHierarchyLink"; private static final String GLOSSARY_CATEGORY_TYPE_NAME = AtlasGlossaryCategory.class.getSimpleName(); private static final String PARENT_GLOSSARY_CATEGORY_GUID = "parentCategoryGuid"; - private boolean deferredActionEnabled = AtlasConfiguration.TASKS_USE_ENABLED.getBoolean(); private static final TypeReference<List<TimeBoundary>> TIME_BOUNDARIES_LIST_TYPE = new TypeReference<List<TimeBoundary>>() {}; @@ -701,7 +699,7 @@ public class EntityGraphRetriever { Iterable<AtlasEdge> propagationEdges = entityVertex.getEdges(AtlasEdgeDirection.BOTH, tagPropagationEdges); for (AtlasEdge propagationEdge : propagationEdges) { - if (getEdgeStatus(propagationEdge) != ACTIVE && !deferredActionEnabled && !RequestContext.get().isImportInProgress()) { + if (getEdgeStatus(propagationEdge) != ACTIVE && !RequestContext.get().isImportInProgress()) { continue; } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java index a614a7a84..1822581d4 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java @@ -50,6 +50,8 @@ public abstract class ClassificationTask extends AbstractTask { public static final String PARAM_RELATIONSHIP_GUID = "relationshipGuid"; public static final String PARAM_RELATIONSHIP_OBJECT = "relationshipObject"; public static final String PARAM_RELATIONSHIP_EDGE_ID = "relationshipEdgeId"; + public static final String PARAM_IMPORT_IN_PROGRESS = "isImportInProgress"; + public static final boolean DEFAULT_IMPORT_IN_PROGRESS = false; protected final AtlasGraph graph; protected final EntityGraphMapper entityGraphMapper; @@ -66,12 +68,17 @@ public abstract class ClassificationTask extends AbstractTask { } public static Map<String, Object> toParameters(String entityGuid, String classificationVertexId, String relationshipGuid, String classificationName) { + return toParameters(entityGuid, classificationVertexId, relationshipGuid, classificationName, false); + } + + public static Map<String, Object> toParameters(String entityGuid, String classificationVertexId, String relationshipGuid, String classificationName, boolean isImportInProgress) { Map<String, Object> ret = new HashMap<>(); ret.put(PARAM_ENTITY_GUID, entityGuid); ret.put(PARAM_CLASSIFICATION_VERTEX_ID, classificationVertexId); ret.put(PARAM_CLASSIFICATION_NAME, classificationName); ret.put(PARAM_RELATIONSHIP_GUID, relationshipGuid); + ret.put(PARAM_IMPORT_IN_PROGRESS, isImportInProgress); return ret; } @@ -119,6 +126,14 @@ public abstract class ClassificationTask extends AbstractTask { return FAILED; } + Object obj = params.get(PARAM_IMPORT_IN_PROGRESS); + if (obj != null) { + LOG.debug("Task: {}: Setting import progress set to: {}", getTaskGuid(), obj); + RequestContext.get().setImportInProgress((Boolean) obj); + }else { + RequestContext.get().setImportInProgress(DEFAULT_IMPORT_IN_PROGRESS); + } + RequestContext.get().setUser(userName, null); try { diff --git a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java index 702171cc3..2eaf6d57d 100644 --- a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java @@ -51,6 +51,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertex; +import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedTraitNames; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters; import static org.apache.atlas.utils.TestLoadModelUtils.loadModelFromJson; import static org.testng.Assert.assertEquals; @@ -63,8 +65,14 @@ import static org.testng.Assert.assertTrue; public class ClassificationPropagationWithTasksTest extends AtlasTestBase { private static final String IMPORT_FILE = "tag-propagation-data.zip"; + private static final String IMPORT_DELETE_FILE = "deleted_tab_propagation.zip"; + private static final String HDFS_PATH_EMPLOYEES = "a3955120-ac17-426f-a4af-972ec8690e5f"; + private static final String HIVE_TABLE = "089c1ad4-9dde-4f9e-80c8-12a3046be337"; + + private static final String HIVE_TABLE_CTAS = "e83551b7-bbef-45aa-99d5-3d98c0ac737b"; + @Inject private AtlasTypeDefStore typeDefStore; @@ -211,6 +219,28 @@ public class ClassificationPropagationWithTasksTest extends AtlasTestBase { assertNotNull(impactedEntities); } + @Test + public void runImportForDeletedEntityLineage() throws Exception { + runImportWithNoParameters(importService, getZipSource(IMPORT_DELETE_FILE)); + final String tagName = "classification1"; + + AtlasEntity hiveTable = getEntity(HIVE_TABLE); + AtlasEntity hiveTableCtas = getEntity(HIVE_TABLE_CTAS); + + AtlasVertex parentEntityVertex = AtlasGraphUtilsV2.findByGuid(hiveTable.getGuid()); + + AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(hiveTableCtas.getGuid()); + + AtlasVertex classificationVertex = getClassificationVertex(parentEntityVertex, tagName); + assertNotNull(entityVertex); + assertNotNull(parentEntityVertex); + assertNotNull(classificationVertex); + + List<String> propagatedTraitNames = getPropagatedTraitNames(entityVertex); + + assertNotNull(propagatedTraitNames); + } + private void loadModelFilesAndImportTestData() { try { loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry); diff --git a/repository/src/test/resources/deleted_tab_propagation.zip b/repository/src/test/resources/deleted_tab_propagation.zip new file mode 100644 index 000000000..ace129350 Binary files /dev/null and b/repository/src/test/resources/deleted_tab_propagation.zip differ
