This is an automated email from the ASF dual-hosted git repository.
pinal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 9e639a9e0 ATLAS-5056: Entity audit should not be created for instances
of internal entity-types (#471)
9e639a9e0 is described below
commit 9e639a9e00dc2445b4f717dccdf47b19f373b8c1
Author: Aditya Gupta <[email protected]>
AuthorDate: Thu Feb 5 14:50:32 2026 +0530
ATLAS-5056: Entity audit should not be created for instances of internal
entity-types (#471)
---
.../store/graph/v2/AtlasEntityChangeNotifier.java | 23 +-
.../v2/bulkimport/EntityChangeNotifierNopTest.java | 311 +++++++++++++++++++++
2 files changed, 325 insertions(+), 9 deletions(-)
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
index 527679c7c..daad019a4 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
@@ -33,7 +33,6 @@ import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
import org.apache.atlas.model.notification.EntityNotification;
-import org.apache.atlas.repository.audit.AtlasAuditService;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.graph.FullTextMapperV2;
import org.apache.atlas.repository.graph.GraphHelper;
@@ -59,7 +58,7 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
-import java.util.function.Predicate;
+import java.util.stream.Collectors;
import static
org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD;
import static
org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE;
@@ -69,8 +68,6 @@ import static
org.apache.atlas.repository.Constants.ENTITY_TEXT_PROPERTY_KEY;
public class AtlasEntityChangeNotifier implements IAtlasEntityChangeNotifier {
private static final Logger LOG =
LoggerFactory.getLogger(AtlasEntityChangeNotifier.class);
- private static final Predicate<AtlasEntityHeader>
PRED_IS_NOT_TYPE_AUDIT_ENTITY = obj ->
!obj.getTypeName().equals(AtlasAuditService.ENTITY_TYPE_AUDIT_ENTRY);
-
private final Set<EntityChangeListener> entityChangeListeners;
private final Set<EntityChangeListenerV2> entityChangeListenersV2;
private final AtlasInstanceConverter instanceConverter;
@@ -400,24 +397,32 @@ public class AtlasEntityChangeNotifier implements
IAtlasEntityChangeNotifier {
return listener.getClass().getSimpleName();
}
- private boolean skipAuditEntries(List<AtlasEntityHeader> entityHeaders) {
- return CollectionUtils.isEmpty(entityHeaders) ||
entityHeaders.stream().noneMatch(PRED_IS_NOT_TYPE_AUDIT_ENTITY);
+ private List<AtlasEntityHeader>
filterNonInternalEntities(List<AtlasEntityHeader> entityHeaders) {
+ if (CollectionUtils.isEmpty(entityHeaders)) {
+ return Collections.emptyList();
+ }
+
+ return entityHeaders.stream().filter(atlasEntityHeader ->
!GraphHelper.isInternalType(atlasEntityHeader.getTypeName())).collect(Collectors.toList());
}
private void notifyListeners(List<AtlasEntityHeader> entityHeaders,
EntityOperation operation, boolean isImport) throws AtlasBaseException {
if (CollectionUtils.isEmpty(entityHeaders)) {
return;
}
- if (skipAuditEntries(entityHeaders)) {
+
+ List<AtlasEntityHeader> nonInternalEntities =
filterNonInternalEntities(entityHeaders);
+
+ if (CollectionUtils.isEmpty(nonInternalEntities)) {
+ LOG.info("Skipping notifications: All entities are internal
types");
return;
}
MetricRecorder metric =
RequestContext.get().startMetricRecord("notifyListeners");
if (isV2EntityNotificationEnabled) {
- notifyV2Listeners(entityHeaders, operation, isImport);
+ notifyV2Listeners(nonInternalEntities, operation, isImport);
} else {
- notifyV1Listeners(entityHeaders, operation, isImport);
+ notifyV1Listeners(nonInternalEntities, operation, isImport);
}
RequestContext.get().endMetricRecord(metric);
diff --git
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/bulkimport/EntityChangeNotifierNopTest.java
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/bulkimport/EntityChangeNotifierNopTest.java
index 20e62de5f..3cb183e2a 100644
---
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/bulkimport/EntityChangeNotifierNopTest.java
+++
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/bulkimport/EntityChangeNotifierNopTest.java
@@ -17,13 +17,24 @@
*/
package org.apache.atlas.repository.store.graph.v2.bulkimport;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.listener.EntityChangeListener;
+import org.apache.atlas.listener.EntityChangeListenerV2;
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
import org.apache.atlas.model.notification.EntityNotification;
+import org.apache.atlas.repository.converters.AtlasInstanceConverter;
+import org.apache.atlas.repository.graph.FullTextMapperV2;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityChangeNotifier;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -34,16 +45,60 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
public class EntityChangeNotifierNopTest {
private EntityChangeNotifierNop entityChangeNotifierNop;
+ private AtlasEntityChangeNotifier atlasEntityChangeNotifier;
+ private EntityChangeListenerV2 mockListener;
+ private AtlasInstanceConverter mockInstanceConverter;
+ private FullTextMapperV2 mockFullTextMapper;
+ private AtlasTypeRegistry mockTypeRegistry;
+
@BeforeMethod
public void setUp() {
entityChangeNotifierNop = new EntityChangeNotifierNop();
}
+ @BeforeMethod
+ public void setUpAtlasEntityChangeNotifier() {
+ RequestContext.clear();
+ RequestContext.get();
+
+ mockListener = mock(EntityChangeListenerV2.class);
+ mockInstanceConverter = mock(AtlasInstanceConverter.class);
+ mockFullTextMapper = mock(FullTextMapperV2.class);
+ mockTypeRegistry = mock(AtlasTypeRegistry.class);
+
+ // Need to provide both V1 and V2 listeners to pass the isEmpty check
+ // The actual test is focused on V2 listeners
+ Set<EntityChangeListenerV2> listenersV2 = new HashSet<>();
+ listenersV2.add(mockListener);
+
+ // Create a mock V1 listener to pass the isEmpty check
+ EntityChangeListener mockV1Listener = mock(EntityChangeListener.class);
+ Set<EntityChangeListener> listenersV1 = new HashSet<>();
+ listenersV1.add(mockV1Listener);
+
+ atlasEntityChangeNotifier = new AtlasEntityChangeNotifier(
+ listenersV1,
+ listenersV2,
+ mockInstanceConverter,
+ mockFullTextMapper,
+ mockTypeRegistry);
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ RequestContext.clear();
+ }
+
@Test
public void testOnEntitiesMutated() {
EntityMutationResponse mockResponse =
mock(EntityMutationResponse.class);
@@ -236,4 +291,260 @@ public class EntityChangeNotifierNopTest {
// Test with empty attributes
entityChangeNotifierNop.onBusinessAttributesUpdated(entityGuid, new
HashMap<>());
}
+
+ /**
+ * Scenario 1: Single normal hive_table entity
+ * Expected: Notification should be sent
+ */
+ @Test
+ public void testNotifyListeners_SingleNormalEntity() throws
AtlasBaseException {
+ // Create a single hive_table entity header
+ AtlasEntityHeader hiveTableHeader = new AtlasEntityHeader();
+ hiveTableHeader.setTypeName("hive_table");
+ hiveTableHeader.setGuid("hive-table-guid-001");
+ hiveTableHeader.setDisplayText("audit_test_table1");
+
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put("name", "audit_test_table1");
+ attributes.put("qualifiedName", "audit_test_db.audit_test_table1@cm");
+ attributes.put("owner", "hive");
+ hiveTableHeader.setAttributes(attributes);
+
+ // Create mutation response
+ EntityMutationResponse mutationResponse = new EntityMutationResponse();
+ mutationResponse.addEntity(EntityOperation.CREATE, hiveTableHeader);
+
+ // Execute
+ atlasEntityChangeNotifier.onEntitiesMutated(mutationResponse, false);
+
+ // Verify: listener should be called with the entity (filtering should
allow normal entities through)
+ verify(mockListener, times(1)).onEntitiesAdded(anyList(),
anyBoolean());
+ }
+
+ /**
+ * Scenario 2: Single internal type entity (__AtlasAuditEntry)
+ * Expected: Notification should NOT be sent
+ */
+ @Test
+ public void testNotifyListeners_SingleInternalEntity_AuditEntry() throws
AtlasBaseException {
+ // Create a single __AtlasAuditEntry entity header
+ AtlasEntityHeader auditEntryHeader = new AtlasEntityHeader();
+ auditEntryHeader.setTypeName("__AtlasAuditEntry");
+ auditEntryHeader.setGuid("audit-entry-guid-001");
+ auditEntryHeader.setDisplayText("__audit_entry_001");
+
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put("qualifiedName", "__audit_entry_001");
+ attributes.put("entityId", "dummy-entity-id");
+ attributes.put("entityType", "hive_table");
+ attributes.put("operation", "OTHERS");
+ auditEntryHeader.setAttributes(attributes);
+
+ // Create mutation response
+ EntityMutationResponse mutationResponse = new EntityMutationResponse();
+ mutationResponse.addEntity(EntityOperation.CREATE, auditEntryHeader);
+
+ // Execute
+ atlasEntityChangeNotifier.onEntitiesMutated(mutationResponse, false);
+
+ // Verify: listener should NOT be called (internal entity filtered out)
+ verify(mockListener, never()).onEntitiesAdded(anyList(), anyBoolean());
+ }
+
+ /**
+ * Scenario 2b: Single internal type entity (__AtlasUserProfile)
+ * Expected: Notification should NOT be sent
+ */
+ @Test
+ public void testNotifyListeners_SingleInternalEntity_UserProfile() throws
AtlasBaseException {
+ // Create a single __AtlasUserProfile entity header
+ AtlasEntityHeader userProfileHeader = new AtlasEntityHeader();
+ userProfileHeader.setTypeName("__AtlasUserProfile");
+ userProfileHeader.setGuid("user-profile-guid-001");
+ userProfileHeader.setDisplayText("__user_profile_001");
+
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put("qualifiedName", "__user_profile_001");
+ attributes.put("name", "testuser");
+ attributes.put("fullName", "Test User");
+ userProfileHeader.setAttributes(attributes);
+
+ // Create mutation response
+ EntityMutationResponse mutationResponse = new EntityMutationResponse();
+ mutationResponse.addEntity(EntityOperation.CREATE, userProfileHeader);
+
+ // Execute
+ atlasEntityChangeNotifier.onEntitiesMutated(mutationResponse, false);
+
+ // Verify: listener should NOT be called (internal entity filtered out)
+ verify(mockListener, never()).onEntitiesAdded(anyList(), anyBoolean());
+ }
+
+ /**
+ * Scenario 2c: Single internal type entity (__AtlasMetricsStat)
+ * Expected: Notification should NOT be sent
+ */
+ @Test
+ public void testNotifyListeners_SingleInternalEntity_MetricsStat() throws
AtlasBaseException {
+ // Create a single __AtlasMetricsStat entity header
+ AtlasEntityHeader metricsStatHeader = new AtlasEntityHeader();
+ metricsStatHeader.setTypeName("__AtlasMetricsStat");
+ metricsStatHeader.setGuid("metrics-stat-guid-001");
+ metricsStatHeader.setDisplayText("__metrics_stat_001");
+
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put("qualifiedName", "__metrics_stat_001");
+ attributes.put("metricsId", "metric_001");
+ attributes.put("metricName", "dummyMetric");
+ attributes.put("metricValue", 100);
+ metricsStatHeader.setAttributes(attributes);
+
+ // Create mutation response
+ EntityMutationResponse mutationResponse = new EntityMutationResponse();
+ mutationResponse.addEntity(EntityOperation.CREATE, metricsStatHeader);
+
+ // Execute
+ atlasEntityChangeNotifier.onEntitiesMutated(mutationResponse, false);
+
+ // Verify: listener should NOT be called (internal entity filtered out)
+ verify(mockListener, never()).onEntitiesAdded(anyList(), anyBoolean());
+ }
+
+ /**
+ * Scenario 3: Bulk normal hive_table entities
+ * Expected: Notification should be sent for all
+ */
+ @Test
+ public void testNotifyListeners_BulkNormalEntities() throws
AtlasBaseException {
+ // Create mutation response
+ EntityMutationResponse mutationResponse = new EntityMutationResponse();
+
+ // Create multiple hive_table entity headers
+ for (int i = 1; i <= 3; i++) {
+ AtlasEntityHeader hiveTableHeader = new AtlasEntityHeader();
+ hiveTableHeader.setTypeName("hive_table");
+ hiveTableHeader.setGuid("hive-table-guid-00" + i);
+ hiveTableHeader.setDisplayText("audit_test_table" + i);
+
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put("name", "audit_test_table" + i);
+ attributes.put("qualifiedName", "audit_test_db.audit_test_table" +
i + "@cm");
+ attributes.put("owner", "hive");
+ hiveTableHeader.setAttributes(attributes);
+
+ mutationResponse.addEntity(EntityOperation.CREATE,
hiveTableHeader);
+ }
+
+ // Execute
+ atlasEntityChangeNotifier.onEntitiesMutated(mutationResponse, false);
+
+ // Verify: listener should be called with all entities
+ verify(mockListener, times(1)).onEntitiesAdded(anyList(),
anyBoolean());
+ }
+
+ /**
+ * Scenario 4: Bulk internal type entities
+ * Expected: Notification should NOT be sent
+ */
+ @Test
+ public void testNotifyListeners_BulkInternalEntities() throws
AtlasBaseException {
+ // Create mutation response
+ EntityMutationResponse mutationResponse = new EntityMutationResponse();
+
+ // Add __AtlasAuditEntry
+ AtlasEntityHeader auditEntryHeader = new AtlasEntityHeader();
+ auditEntryHeader.setTypeName("__AtlasAuditEntry");
+ auditEntryHeader.setGuid("audit-entry-guid-001");
+ auditEntryHeader.setDisplayText("__audit_entry_001");
+ Map<String, Object> auditAttrs = new HashMap<>();
+ auditAttrs.put("qualifiedName", "__audit_entry_001");
+ auditEntryHeader.setAttributes(auditAttrs);
+ mutationResponse.addEntity(EntityOperation.CREATE, auditEntryHeader);
+
+ // Add __AtlasUserProfile
+ AtlasEntityHeader userProfileHeader = new AtlasEntityHeader();
+ userProfileHeader.setTypeName("__AtlasUserProfile");
+ userProfileHeader.setGuid("user-profile-guid-001");
+ userProfileHeader.setDisplayText("__user_profile_001");
+ Map<String, Object> userAttrs = new HashMap<>();
+ userAttrs.put("qualifiedName", "__user_profile_001");
+ userProfileHeader.setAttributes(userAttrs);
+ mutationResponse.addEntity(EntityOperation.CREATE, userProfileHeader);
+
+ // Add __AtlasMetricsStat
+ AtlasEntityHeader metricsStatHeader = new AtlasEntityHeader();
+ metricsStatHeader.setTypeName("__AtlasMetricsStat");
+ metricsStatHeader.setGuid("metrics-stat-guid-001");
+ metricsStatHeader.setDisplayText("__metrics_stat_001");
+ Map<String, Object> metricsAttrs = new HashMap<>();
+ metricsAttrs.put("qualifiedName", "__metrics_stat_001");
+ metricsStatHeader.setAttributes(metricsAttrs);
+ mutationResponse.addEntity(EntityOperation.CREATE, metricsStatHeader);
+
+ // Execute
+ atlasEntityChangeNotifier.onEntitiesMutated(mutationResponse, false);
+
+ // Verify: listener should NOT be called (all internal entities
filtered out)
+ verify(mockListener, never()).onEntitiesAdded(anyList(), anyBoolean());
+ }
+
+ /**
+ * Scenario 5: Bulk mixed (normal and internal type entities)
+ * Expected: Notification should be sent ONLY for normal entities
+ */
+ @Test
+ public void testNotifyListeners_BulkMixedEntities() throws
AtlasBaseException {
+ // Create mutation response
+ EntityMutationResponse mutationResponse = new EntityMutationResponse();
+
+ // Add normal hive_table entities
+ for (int i = 1; i <= 3; i++) {
+ AtlasEntityHeader hiveTableHeader = new AtlasEntityHeader();
+ hiveTableHeader.setTypeName("hive_table");
+ hiveTableHeader.setGuid("hive-table-guid-5" + i);
+ hiveTableHeader.setDisplayText("audit_test_table5" + i);
+
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put("name", "audit_test_table5" + i);
+ attributes.put("qualifiedName", "audit_test_db.audit_test_table5"
+ i + "@cm");
+ attributes.put("owner", "hive");
+ hiveTableHeader.setAttributes(attributes);
+
+ mutationResponse.addEntity(EntityOperation.CREATE,
hiveTableHeader);
+ }
+
+ // Add internal entities
+ AtlasEntityHeader auditEntryHeader = new AtlasEntityHeader();
+ auditEntryHeader.setTypeName("__AtlasAuditEntry");
+ auditEntryHeader.setGuid("audit-entry-guid-051");
+ auditEntryHeader.setDisplayText("__audit_entry_051");
+ Map<String, Object> auditAttrs = new HashMap<>();
+ auditAttrs.put("qualifiedName", "__audit_entry_051");
+ auditEntryHeader.setAttributes(auditAttrs);
+ mutationResponse.addEntity(EntityOperation.CREATE, auditEntryHeader);
+
+ AtlasEntityHeader userProfileHeader = new AtlasEntityHeader();
+ userProfileHeader.setTypeName("__AtlasUserProfile");
+ userProfileHeader.setGuid("user-profile-guid-051");
+ userProfileHeader.setDisplayText("__user_profile_051");
+ Map<String, Object> userAttrs = new HashMap<>();
+ userAttrs.put("qualifiedName", "__user_profile_051");
+ userProfileHeader.setAttributes(userAttrs);
+ mutationResponse.addEntity(EntityOperation.CREATE, userProfileHeader);
+
+ AtlasEntityHeader metricsStatHeader = new AtlasEntityHeader();
+ metricsStatHeader.setTypeName("__AtlasMetricsStat");
+ metricsStatHeader.setGuid("metrics-stat-guid-051");
+ metricsStatHeader.setDisplayText("__metrics_stat_051");
+ Map<String, Object> metricsAttrs = new HashMap<>();
+ metricsAttrs.put("qualifiedName", "__metrics_stat_051");
+ metricsStatHeader.setAttributes(metricsAttrs);
+ mutationResponse.addEntity(EntityOperation.CREATE, metricsStatHeader);
+
+ // Execute
+ atlasEntityChangeNotifier.onEntitiesMutated(mutationResponse, false);
+
+ // Verify: listener should be called (only non-internal entities
passed)
+ verify(mockListener, times(1)).onEntitiesAdded(anyList(),
anyBoolean());
+ }
}