This is an automated email from the ASF dual-hosted git repository.
radhikakundam pushed a commit to branch atlas-2.5
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/atlas-2.5 by this push:
new 292af2e95 ATLAS-5208, ATLAS-5209: Ignore hook configs: Rename table
messages are not ignored, [Atlas Server config] Hive table related all entities
are ignored when set ignore pattern to ignore only hive_columns and
hive_table_ddl for matching qualified name pattern. (#518) (#528)
292af2e95 is described below
commit 292af2e9545fd4a136b58d1c9b9770e462eb45e5
Author: Amruth S <[email protected]>
AuthorDate: Fri Feb 13 23:08:37 2026 +0530
ATLAS-5208, ATLAS-5209: Ignore hook configs: Rename table messages are not
ignored, [Atlas Server config] Hive table related all entities are ignored when
set ignore pattern to ignore only hive_columns and hive_table_ddl for matching
qualified name pattern. (#518) (#528)
* ATLAS-5209: [Atlas Server config] Hive table related all entities are
ignored when set ignore pattern to ignore only hive_columns and hive_table_ddl
for matching qualified name pattern.
* ATLAS-5208: Ignore hook configs: Rename table messages are not ignored
* Fixed checkstyle violations
* Fixed test case failures
* Resolved review comments
---
.../main/java/org/apache/atlas/hook/AtlasHook.java | 97 ++++++++++++++++-
.../preprocessor/GenericEntityPreprocessor.java | 86 +++++++++++++--
.../GenericEntityPreprocessorTest.java | 121 ++++++++++++++++++++-
3 files changed, 292 insertions(+), 12 deletions(-)
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 7373d9721..74e536b2a 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -25,6 +25,7 @@ import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.kafka.NotificationProvider;
import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.MessageSource;
import org.apache.atlas.notification.NotificationException;
@@ -41,6 +42,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -73,6 +75,9 @@ public abstract class AtlasHook {
public static final String ATLAS_HOOK_ENTITY_IGNORE_PATTERN
= "atlas.hook.entity.ignore.pattern";
public static final String ATTRIBUTE_QUALIFIED_NAME
= "qualifiedName";
+ public static final String ATTRIBUTE_INPUTS
= "inputs";
+ public static final String ATTRIBUTE_OUTPUTS
= "outputs";
+
public static final boolean isRESTNotificationEnabled;
public static final boolean isHookMsgsSortEnabled;
@@ -262,6 +267,88 @@ public abstract class AtlasHook {
return entitiesWithExtInfo;
}
+ public static String getQualifiedName(Object obj) {
+ Map<String, Object> attributes = null;
+
+ if (obj instanceof AtlasObjectId) {
+ attributes = ((AtlasObjectId) obj).getUniqueAttributes();
+ } else if (obj instanceof Map) {
+ attributes = (Map) ((Map)
obj).get(AtlasObjectId.KEY_UNIQUE_ATTRIBUTES);
+ } else if (obj instanceof AtlasEntity) {
+ attributes = ((AtlasEntity) obj).getAttributes();
+ } else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) {
+ attributes = ((AtlasEntity.AtlasEntityWithExtInfo)
obj).getEntity().getAttributes();
+ }
+
+ Object ret = attributes != null ?
attributes.get(ATTRIBUTE_QUALIFIED_NAME) : null;
+
+ return ret != null ? ret.toString() : null;
+ }
+
+ private static void filterProcessRelatedEntities(Object obj) {
+ if (obj == null || !(obj instanceof Collection)) {
+ return;
+ }
+
+ Collection objList = (Collection) obj;
+ List toRemove = new ArrayList();
+
+ for (Object entity : objList) {
+ String qualifiedName = getQualifiedName(entity);
+
+ if (isMatch(qualifiedName, entitiesToIgnore)) {
+ toRemove.add(entity);
+
+ LOG.info("Ignored entity {}", qualifiedName);
+ }
+ }
+
+ objList.removeAll(toRemove);
+ }
+
+ private static void filterRelationshipAttributes(Map<String, Object>
relationshipAttributes) {
+ if (relationshipAttributes == null) {
+ return;
+ }
+
+ List<String> keysToRemove = new ArrayList<>();
+ for (Map.Entry<String, Object> entry :
relationshipAttributes.entrySet()) {
+ Object obj = entry.getValue();
+
+ if (obj instanceof Collection) {
+ Collection entities = (Collection) obj;
+
+ entities.removeIf((Object entity) -> {
+ String qualifiedName = getQualifiedName(entity);
+
+ return qualifiedName != null && isMatch(qualifiedName,
entitiesToIgnore);
+ });
+ } else {
+ String qualifiedName = getQualifiedName(obj);
+
+ if (qualifiedName != null && isMatch(qualifiedName,
entitiesToIgnore)) {
+ keysToRemove.add(entry.getKey());
+ }
+ }
+ }
+
+ for (String key : keysToRemove) {
+ relationshipAttributes.remove(key);
+
+ LOG.info("Ignored entity {}", key);
+ }
+ }
+
+ private static void filterEntityAttributes(AtlasEntity entity) {
+ Object inputs = entity.getAttribute(ATTRIBUTE_INPUTS);
+ Object outputs = entity.getAttribute(ATTRIBUTE_OUTPUTS);
+
+ filterProcessRelatedEntities(inputs);
+ filterProcessRelatedEntities(outputs);
+
+ filterRelationshipAttributes(entity.getRelationshipAttributes());
+ }
+
private static void preprocessEntities(List<HookNotification>
hookNotifications) {
for (int i = 0; i < hookNotifications.size(); i++) {
HookNotification hookNotification = hookNotifications.get(i);
@@ -269,7 +356,7 @@ public abstract class AtlasHook {
AtlasEntity.AtlasEntitiesWithExtInfo entitiesWithExtInfo =
getAtlasEntitiesWithExtInfo(hookNotification);
if (entitiesWithExtInfo == null) {
- return;
+ continue;
}
List<AtlasEntity> entities = entitiesWithExtInfo.getEntities();
@@ -278,12 +365,20 @@ public abstract class AtlasHook {
entities.removeIf((AtlasEntity entity) ->
isMatch(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME).toString(),
entitiesToIgnore));
+ for (AtlasEntity entity : entities) {
+ filterEntityAttributes(entity);
+ }
+
Map<String, AtlasEntity> referredEntitiesMap =
entitiesWithExtInfo.getReferredEntities();
referredEntitiesMap = ((referredEntitiesMap != null) ?
referredEntitiesMap : Collections.emptyMap());
referredEntitiesMap.entrySet().removeIf((Map.Entry<String,
AtlasEntity> entry) ->
isMatch(entry.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME).toString(),
entitiesToIgnore));
+ for (Map.Entry<String, AtlasEntity> entry :
referredEntitiesMap.entrySet()) {
+ filterEntityAttributes(entry.getValue());
+ }
+
if (CollectionUtils.isEmpty(entities) &&
CollectionUtils.isEmpty(referredEntitiesMap.values())) {
hookNotifications.remove(i--);
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessor.java
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessor.java
index a91da38e8..da3653611 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessor.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessor.java
@@ -20,7 +20,10 @@ package org.apache.atlas.notification.preprocessor;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.commons.collections.CollectionUtils;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.regex.Pattern;
public class GenericEntityPreprocessor extends EntityPreprocessor {
@@ -36,27 +39,92 @@ public class GenericEntityPreprocessor extends
EntityPreprocessor {
@Override
public void preprocess(AtlasEntity entity, PreprocessorContext context) {
- if (entity != null && isToBeIgnored(entity)) {
+ if (entity == null) {
+ return;
+ }
+
+ if (isToBeIgnored(entity)) {
context.addToIgnoredEntities(entity);
+ return;
}
+
+ Object inputs = entity.getAttribute(ATTRIBUTE_INPUTS);
+ Object outputs = entity.getAttribute(ATTRIBUTE_OUTPUTS);
+
+ filterProcessRelatedEntities(inputs, context);
+ filterProcessRelatedEntities(outputs, context);
+
+ filterRelationshipAttributes(entity.getRelationshipAttributes(),
context);
}
private boolean isMatch(String property, List<Pattern> patterns) {
return patterns.stream().anyMatch((Pattern pattern) ->
pattern.matcher(property).matches());
}
- private boolean isToBeIgnored(AtlasEntity entity) {
+ private void filterProcessRelatedEntities(Object obj, PreprocessorContext
context) {
+ if (obj == null || !(obj instanceof Collection)) {
+ return;
+ }
+
+ Collection objList = (Collection) obj;
+ List toRemove = new ArrayList();
+
+ for (Object entity : objList) {
+ if (isToBeIgnored(entity)) {
+ context.addToIgnoredEntities(entity);
+ toRemove.add(entity);
+ }
+ }
+
+ objList.removeAll(toRemove);
+ }
+
+ private boolean isToBeIgnored(Object entity) {
String qualifiedName = getQualifiedName(entity);
- boolean decision;
+ String typeName = getTypeName(entity);
+ boolean decision = false;
- if (CollectionUtils.isEmpty(this.entityTypesToIgnore)) { // Will
Ignore all entities whose qualified name matches the ignore pattern.
- decision = isMatch(qualifiedName, this.entitiesToIgnore);
- } else if (CollectionUtils.isEmpty(this.entitiesToIgnore)) { // Will
Ignore all entities whose type matches the regex given.
- decision = isMatch(entity.getTypeName(), this.entityTypesToIgnore);
- } else { // Combination of above 2 cases.
- decision = isMatch(entity.getTypeName(), this.entityTypesToIgnore)
&& isMatch(qualifiedName, this.entitiesToIgnore);
+ if (qualifiedName != null && typeName != null) {
+ if (CollectionUtils.isEmpty(this.entityTypesToIgnore)) { // Will
Ignore all entities whose qualified name matches the ignore pattern.
+ decision = isMatch(qualifiedName, this.entitiesToIgnore);
+ } else if (CollectionUtils.isEmpty(this.entitiesToIgnore)) { //
Will Ignore all entities whose type matches the regex given.
+ decision = isMatch(typeName, this.entityTypesToIgnore);
+ } else { // Combination of above 2 cases.
+ decision = isMatch(typeName, this.entityTypesToIgnore) &&
isMatch(qualifiedName, this.entitiesToIgnore);
+ }
}
return decision;
}
+
+ private void filterRelationshipAttributes(Map<String, Object>
relationshipAttributes, PreprocessorContext context) {
+ if (relationshipAttributes == null) {
+ return;
+ }
+ List<String> keysToRemove = new ArrayList<>();
+
+ for (Map.Entry<String, Object> entry :
relationshipAttributes.entrySet()) {
+ Object obj = entry.getValue();
+
+ if (obj instanceof Collection) {
+ Collection entities = (Collection) obj;
+
+ entities.removeIf((Object entity) -> {
+ if (isToBeIgnored(entity)) {
+ context.addToIgnoredEntities(entity);
+ return true;
+ }
+ return false;
+ });
+ } else {
+ if (isToBeIgnored(obj)) {
+ context.addToIgnoredEntities(obj);
+ keysToRemove.add(entry.getKey());
+ }
+ }
+ }
+ for (String key : keysToRemove) {
+ relationshipAttributes.remove(key);
+ }
+ }
}
diff --git
a/webapp/src/test/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessorTest.java
b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessorTest.java
index d75ca2574..463babc8d 100644
---
a/webapp/src/test/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessorTest.java
+++
b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessorTest.java
@@ -22,7 +22,10 @@ import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.notification.hook.HookMessageDeserializer;
+import org.junit.Before;
+import org.junit.Test;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -34,14 +37,23 @@ import java.util.function.Predicate;
import java.util.regex.Pattern;
import static
org.apache.atlas.notification.preprocessor.EntityPreprocessor.getQualifiedName;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
public class GenericEntityPreprocessorTest {
private final HookMessageDeserializer deserializer = new
HookMessageDeserializer();
+ private GenericEntityPreprocessor preprocessor;
+ private PreprocessorContext context;
+ private AtlasEntity entity;
public void testEntityTypesToIgnore(String msgJson, List<Pattern>
entityTypesToIgnore) {
- PreprocessorContext context = getPreprocessorContext(msgJson);
- List<AtlasEntity> entities = context.getEntities();
+ PreprocessorContext context = getPreprocessorContext(msgJson);
+ List<AtlasEntity> entities = context.getEntities();
Set<String> filteredEntitiesActual = filterEntity(entities,
(AtlasEntity entity) -> isMatch(entityTypesToIgnore, entity.getTypeName()));
@@ -148,4 +160,109 @@ public class GenericEntityPreprocessorTest {
return filteredEntitiesActual;
}
+
+ @Before
+ public void setup() {
+ preprocessor = new GenericEntityPreprocessor(
+ Collections.singletonList(Pattern.compile("dbType.*")), //
entityTypesToIgnore
+ Collections.singletonList(Pattern.compile(".*ignore.*"))); //
entitiesToIgnore
+ context = mock(PreprocessorContext.class);
+ entity = mock(AtlasEntity.class);
+ }
+
+ @Test
+ public void testPreprocess_ShouldIgnore_WhenEntityMatches() {
+ when(entity.getTypeName()).thenReturn("dbType123");
+
when(entity.getAttributes()).thenReturn(Collections.singletonMap("qualifiedName",
"my_ignore_table"));
+
+ preprocessor.preprocess(entity, context);
+
+ verify(context).addToIgnoredEntities(entity);
+ }
+
+ @Test
+ public void testPreprocess_ShouldNotIgnore_WhenEntityDoesNotMatch() {
+ when(entity.getTypeName()).thenReturn("normalType");
+ when(entity.getAttribute("qualifiedName")).thenReturn("safe_table");
+
+ preprocessor.preprocess(entity, context);
+
+ verify(context, never()).addToIgnoredEntities(entity);
+ }
+
+ @Test
+ public void testIsMatch_PrivateMethod_Positive() throws Exception {
+ Method method =
GenericEntityPreprocessor.class.getDeclaredMethod("isMatch", String.class,
List.class);
+ method.setAccessible(true);
+
+ boolean result = (boolean) method.invoke(preprocessor, "dbType123",
+ Collections.singletonList(Pattern.compile("dbType.*")));
+
+ assertTrue(result);
+ }
+
+ @Test
+ public void testIsMatch_PrivateMethod_Negative() throws Exception {
+ Method method =
GenericEntityPreprocessor.class.getDeclaredMethod("isMatch", String.class,
List.class);
+ method.setAccessible(true);
+
+ boolean result = (boolean) method.invoke(preprocessor, "unknownType",
+ Collections.singletonList(Pattern.compile("dbType.*")));
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testIsToBeIgnored_ByTypeOnly() throws Exception {
+ GenericEntityPreprocessor typeOnlyPreprocessor =
+ new
GenericEntityPreprocessor(Collections.singletonList(Pattern.compile("dbType.*")),
Collections.emptyList());
+
+ when(entity.getTypeName()).thenReturn("dbTypeXYZ");
+
when(entity.getAttributes()).thenReturn(Collections.singletonMap("qualifiedName",
"someTable"));
+
+ Method method =
GenericEntityPreprocessor.class.getDeclaredMethod("isToBeIgnored",
Object.class);
+ method.setAccessible(true);
+
+ boolean result = (boolean) method.invoke(typeOnlyPreprocessor, entity);
+ assertTrue(result);
+ }
+
+ @Test
+ public void testIsToBeIgnored_ByQualifiedNameOnly() throws Exception {
+ GenericEntityPreprocessor qnameOnlyPreprocessor =
+ new GenericEntityPreprocessor(Collections.emptyList(),
Collections.singletonList(Pattern.compile(".*ignore.*")));
+
+ when(entity.getTypeName()).thenReturn("anyType");
+
when(entity.getAttributes()).thenReturn(Collections.singletonMap("qualifiedName",
"my_ignore_table"));
+
+ Method method =
GenericEntityPreprocessor.class.getDeclaredMethod("isToBeIgnored",
Object.class);
+ method.setAccessible(true);
+
+ boolean result = (boolean) method.invoke(qnameOnlyPreprocessor,
entity);
+ assertTrue(result);
+ }
+
+ @Test
+ public void testIsToBeIgnored_BothConditions() throws Exception {
+ when(entity.getTypeName()).thenReturn("dbType123");
+
when(entity.getAttributes()).thenReturn(Collections.singletonMap("qualifiedName",
"some_ignore_table"));
+
+ Method method =
GenericEntityPreprocessor.class.getDeclaredMethod("isToBeIgnored",
Object.class);
+ method.setAccessible(true);
+
+ boolean result = (boolean) method.invoke(preprocessor, entity);
+ assertTrue(result);
+ }
+
+ @Test
+ public void testIsToBeIgnored_NoMatch() throws Exception {
+ when(entity.getTypeName()).thenReturn("normalType");
+
when(entity.getAttributes()).thenReturn(Collections.singletonMap("qualifiedName",
"safe_table"));
+
+ Method method =
GenericEntityPreprocessor.class.getDeclaredMethod("isToBeIgnored",
Object.class);
+ method.setAccessible(true);
+
+ boolean result = (boolean) method.invoke(preprocessor, entity);
+ assertFalse(result);
+ }
}