This is an automated email from the ASF dual-hosted git repository.
radhikakundam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new b780e9245 ATLAS-5208, ATLAS-5209: Ignore hook configs: Rename table
messages are not ignored, [Atlas Server config] Hive table related all entities
are ignored when set ignore pattern to ignore only hive_columns and
hive_table_ddl for matching qualified name pattern. (#518)
b780e9245 is described below
commit b780e9245a17e336c87c2bcd4ee52c2d339d1e28
Author: Amruth S <[email protected]>
AuthorDate: Fri Feb 13 22:07:38 2026 +0530
ATLAS-5208, ATLAS-5209: Ignore hook configs: Rename table messages are not
ignored, [Atlas Server config] Hive table related all entities are ignored when
set ignore pattern to ignore only hive_columns and hive_table_ddl for matching
qualified name pattern. (#518)
* ATLAS-5209: [Atlas Server config] Hive table related all entities are
ignored when set ignore pattern to ignore only hive_columns and hive_table_ddl
for matching qualified name pattern.
* ATLAS-5208: Ignore hook configs: Rename table messages are not ignored
* Fixed checkstyle violations
* Fixed test case failures
* Resolved review comments
---
.../main/java/org/apache/atlas/hook/AtlasHook.java | 97 +++++++++++++++++++++-
.../preprocessor/GenericEntityPreprocessor.java | 86 +++++++++++++++++--
.../GenericEntityPreprocessorTest.java | 18 ++--
3 files changed, 182 insertions(+), 19 deletions(-)
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 7373d9721..74e536b2a 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -25,6 +25,7 @@ import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.kafka.NotificationProvider;
import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.MessageSource;
import org.apache.atlas.notification.NotificationException;
@@ -41,6 +42,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -73,6 +75,9 @@ public abstract class AtlasHook {
public static final String ATLAS_HOOK_ENTITY_IGNORE_PATTERN
= "atlas.hook.entity.ignore.pattern";
public static final String ATTRIBUTE_QUALIFIED_NAME
= "qualifiedName";
+ public static final String ATTRIBUTE_INPUTS
= "inputs";
+ public static final String ATTRIBUTE_OUTPUTS
= "outputs";
+
public static final boolean isRESTNotificationEnabled;
public static final boolean isHookMsgsSortEnabled;
@@ -262,6 +267,88 @@ public abstract class AtlasHook {
return entitiesWithExtInfo;
}
+ public static String getQualifiedName(Object obj) {
+ Map<String, Object> attributes = null;
+
+ if (obj instanceof AtlasObjectId) {
+ attributes = ((AtlasObjectId) obj).getUniqueAttributes();
+ } else if (obj instanceof Map) {
+ attributes = (Map) ((Map)
obj).get(AtlasObjectId.KEY_UNIQUE_ATTRIBUTES);
+ } else if (obj instanceof AtlasEntity) {
+ attributes = ((AtlasEntity) obj).getAttributes();
+ } else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) {
+ attributes = ((AtlasEntity.AtlasEntityWithExtInfo)
obj).getEntity().getAttributes();
+ }
+
+ Object ret = attributes != null ?
attributes.get(ATTRIBUTE_QUALIFIED_NAME) : null;
+
+ return ret != null ? ret.toString() : null;
+ }
+
+ private static void filterProcessRelatedEntities(Object obj) {
+ if (obj == null || !(obj instanceof Collection)) {
+ return;
+ }
+
+ Collection objList = (Collection) obj;
+ List toRemove = new ArrayList();
+
+ for (Object entity : objList) {
+ String qualifiedName = getQualifiedName(entity);
+
+ if (isMatch(qualifiedName, entitiesToIgnore)) {
+ toRemove.add(entity);
+
+ LOG.info("Ignored entity {}", qualifiedName);
+ }
+ }
+
+ objList.removeAll(toRemove);
+ }
+
+ private static void filterRelationshipAttributes(Map<String, Object>
relationshipAttributes) {
+ if (relationshipAttributes == null) {
+ return;
+ }
+
+ List<String> keysToRemove = new ArrayList<>();
+ for (Map.Entry<String, Object> entry :
relationshipAttributes.entrySet()) {
+ Object obj = entry.getValue();
+
+ if (obj instanceof Collection) {
+ Collection entities = (Collection) obj;
+
+ entities.removeIf((Object entity) -> {
+ String qualifiedName = getQualifiedName(entity);
+
+ return qualifiedName != null && isMatch(qualifiedName,
entitiesToIgnore);
+ });
+ } else {
+ String qualifiedName = getQualifiedName(obj);
+
+ if (qualifiedName != null && isMatch(qualifiedName,
entitiesToIgnore)) {
+ keysToRemove.add(entry.getKey());
+ }
+ }
+ }
+
+ for (String key : keysToRemove) {
+ relationshipAttributes.remove(key);
+
+ LOG.info("Ignored entity {}", key);
+ }
+ }
+
+ private static void filterEntityAttributes(AtlasEntity entity) {
+ Object inputs = entity.getAttribute(ATTRIBUTE_INPUTS);
+ Object outputs = entity.getAttribute(ATTRIBUTE_OUTPUTS);
+
+ filterProcessRelatedEntities(inputs);
+ filterProcessRelatedEntities(outputs);
+
+ filterRelationshipAttributes(entity.getRelationshipAttributes());
+ }
+
private static void preprocessEntities(List<HookNotification>
hookNotifications) {
for (int i = 0; i < hookNotifications.size(); i++) {
HookNotification hookNotification = hookNotifications.get(i);
@@ -269,7 +356,7 @@ public abstract class AtlasHook {
AtlasEntity.AtlasEntitiesWithExtInfo entitiesWithExtInfo =
getAtlasEntitiesWithExtInfo(hookNotification);
if (entitiesWithExtInfo == null) {
- return;
+ continue;
}
List<AtlasEntity> entities = entitiesWithExtInfo.getEntities();
@@ -278,12 +365,20 @@ public abstract class AtlasHook {
entities.removeIf((AtlasEntity entity) ->
isMatch(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME).toString(),
entitiesToIgnore));
+ for (AtlasEntity entity : entities) {
+ filterEntityAttributes(entity);
+ }
+
Map<String, AtlasEntity> referredEntitiesMap =
entitiesWithExtInfo.getReferredEntities();
referredEntitiesMap = ((referredEntitiesMap != null) ?
referredEntitiesMap : Collections.emptyMap());
referredEntitiesMap.entrySet().removeIf((Map.Entry<String,
AtlasEntity> entry) ->
isMatch(entry.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME).toString(),
entitiesToIgnore));
+ for (Map.Entry<String, AtlasEntity> entry :
referredEntitiesMap.entrySet()) {
+ filterEntityAttributes(entry.getValue());
+ }
+
if (CollectionUtils.isEmpty(entities) &&
CollectionUtils.isEmpty(referredEntitiesMap.values())) {
hookNotifications.remove(i--);
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessor.java
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessor.java
index a91da38e8..da3653611 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessor.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessor.java
@@ -20,7 +20,10 @@ package org.apache.atlas.notification.preprocessor;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.commons.collections.CollectionUtils;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.regex.Pattern;
public class GenericEntityPreprocessor extends EntityPreprocessor {
@@ -36,27 +39,92 @@ public class GenericEntityPreprocessor extends
EntityPreprocessor {
@Override
public void preprocess(AtlasEntity entity, PreprocessorContext context) {
- if (entity != null && isToBeIgnored(entity)) {
+ if (entity == null) {
+ return;
+ }
+
+ if (isToBeIgnored(entity)) {
context.addToIgnoredEntities(entity);
+ return;
}
+
+ Object inputs = entity.getAttribute(ATTRIBUTE_INPUTS);
+ Object outputs = entity.getAttribute(ATTRIBUTE_OUTPUTS);
+
+ filterProcessRelatedEntities(inputs, context);
+ filterProcessRelatedEntities(outputs, context);
+
+ filterRelationshipAttributes(entity.getRelationshipAttributes(),
context);
}
private boolean isMatch(String property, List<Pattern> patterns) {
return patterns.stream().anyMatch((Pattern pattern) ->
pattern.matcher(property).matches());
}
- private boolean isToBeIgnored(AtlasEntity entity) {
+ private void filterProcessRelatedEntities(Object obj, PreprocessorContext
context) {
+ if (obj == null || !(obj instanceof Collection)) {
+ return;
+ }
+
+ Collection objList = (Collection) obj;
+ List toRemove = new ArrayList();
+
+ for (Object entity : objList) {
+ if (isToBeIgnored(entity)) {
+ context.addToIgnoredEntities(entity);
+ toRemove.add(entity);
+ }
+ }
+
+ objList.removeAll(toRemove);
+ }
+
+ private boolean isToBeIgnored(Object entity) {
String qualifiedName = getQualifiedName(entity);
- boolean decision;
+ String typeName = getTypeName(entity);
+ boolean decision = false;
- if (CollectionUtils.isEmpty(this.entityTypesToIgnore)) { // Will
Ignore all entities whose qualified name matches the ignore pattern.
- decision = isMatch(qualifiedName, this.entitiesToIgnore);
- } else if (CollectionUtils.isEmpty(this.entitiesToIgnore)) { // Will
Ignore all entities whose type matches the regex given.
- decision = isMatch(entity.getTypeName(), this.entityTypesToIgnore);
- } else { // Combination of above 2 cases.
- decision = isMatch(entity.getTypeName(), this.entityTypesToIgnore)
&& isMatch(qualifiedName, this.entitiesToIgnore);
+ if (qualifiedName != null && typeName != null) {
+ if (CollectionUtils.isEmpty(this.entityTypesToIgnore)) { // Will
Ignore all entities whose qualified name matches the ignore pattern.
+ decision = isMatch(qualifiedName, this.entitiesToIgnore);
+ } else if (CollectionUtils.isEmpty(this.entitiesToIgnore)) { //
Will Ignore all entities whose type matches the regex given.
+ decision = isMatch(typeName, this.entityTypesToIgnore);
+ } else { // Combination of above 2 cases.
+ decision = isMatch(typeName, this.entityTypesToIgnore) &&
isMatch(qualifiedName, this.entitiesToIgnore);
+ }
}
return decision;
}
+
+ private void filterRelationshipAttributes(Map<String, Object>
relationshipAttributes, PreprocessorContext context) {
+ if (relationshipAttributes == null) {
+ return;
+ }
+ List<String> keysToRemove = new ArrayList<>();
+
+ for (Map.Entry<String, Object> entry :
relationshipAttributes.entrySet()) {
+ Object obj = entry.getValue();
+
+ if (obj instanceof Collection) {
+ Collection entities = (Collection) obj;
+
+ entities.removeIf((Object entity) -> {
+ if (isToBeIgnored(entity)) {
+ context.addToIgnoredEntities(entity);
+ return true;
+ }
+ return false;
+ });
+ } else {
+ if (isToBeIgnored(obj)) {
+ context.addToIgnoredEntities(obj);
+ keysToRemove.add(entry.getKey());
+ }
+ }
+ }
+ for (String key : keysToRemove) {
+ relationshipAttributes.remove(key);
+ }
+ }
}
diff --git
a/webapp/src/test/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessorTest.java
b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessorTest.java
index 1dcb97c9a..463babc8d 100644
---
a/webapp/src/test/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessorTest.java
+++
b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessorTest.java
@@ -173,7 +173,7 @@ public class GenericEntityPreprocessorTest {
@Test
public void testPreprocess_ShouldIgnore_WhenEntityMatches() {
when(entity.getTypeName()).thenReturn("dbType123");
-
when(entity.getAttribute("qualifiedName")).thenReturn("my_ignore_table");
+
when(entity.getAttributes()).thenReturn(Collections.singletonMap("qualifiedName",
"my_ignore_table"));
preprocessor.preprocess(entity, context);
@@ -218,9 +218,9 @@ public class GenericEntityPreprocessorTest {
new
GenericEntityPreprocessor(Collections.singletonList(Pattern.compile("dbType.*")),
Collections.emptyList());
when(entity.getTypeName()).thenReturn("dbTypeXYZ");
- when(entity.getAttribute("qualifiedName")).thenReturn("someTable");
+
when(entity.getAttributes()).thenReturn(Collections.singletonMap("qualifiedName",
"someTable"));
- Method method =
GenericEntityPreprocessor.class.getDeclaredMethod("isToBeIgnored",
AtlasEntity.class);
+ Method method =
GenericEntityPreprocessor.class.getDeclaredMethod("isToBeIgnored",
Object.class);
method.setAccessible(true);
boolean result = (boolean) method.invoke(typeOnlyPreprocessor, entity);
@@ -233,9 +233,9 @@ public class GenericEntityPreprocessorTest {
new GenericEntityPreprocessor(Collections.emptyList(),
Collections.singletonList(Pattern.compile(".*ignore.*")));
when(entity.getTypeName()).thenReturn("anyType");
-
when(entity.getAttribute("qualifiedName")).thenReturn("my_ignore_table");
+
when(entity.getAttributes()).thenReturn(Collections.singletonMap("qualifiedName",
"my_ignore_table"));
- Method method =
GenericEntityPreprocessor.class.getDeclaredMethod("isToBeIgnored",
AtlasEntity.class);
+ Method method =
GenericEntityPreprocessor.class.getDeclaredMethod("isToBeIgnored",
Object.class);
method.setAccessible(true);
boolean result = (boolean) method.invoke(qnameOnlyPreprocessor,
entity);
@@ -245,9 +245,9 @@ public class GenericEntityPreprocessorTest {
@Test
public void testIsToBeIgnored_BothConditions() throws Exception {
when(entity.getTypeName()).thenReturn("dbType123");
-
when(entity.getAttribute("qualifiedName")).thenReturn("some_ignore_table");
+
when(entity.getAttributes()).thenReturn(Collections.singletonMap("qualifiedName",
"some_ignore_table"));
- Method method =
GenericEntityPreprocessor.class.getDeclaredMethod("isToBeIgnored",
AtlasEntity.class);
+ Method method =
GenericEntityPreprocessor.class.getDeclaredMethod("isToBeIgnored",
Object.class);
method.setAccessible(true);
boolean result = (boolean) method.invoke(preprocessor, entity);
@@ -257,9 +257,9 @@ public class GenericEntityPreprocessorTest {
@Test
public void testIsToBeIgnored_NoMatch() throws Exception {
when(entity.getTypeName()).thenReturn("normalType");
- when(entity.getAttribute("qualifiedName")).thenReturn("safe_table");
+
when(entity.getAttributes()).thenReturn(Collections.singletonMap("qualifiedName",
"safe_table"));
- Method method =
GenericEntityPreprocessor.class.getDeclaredMethod("isToBeIgnored",
AtlasEntity.class);
+ Method method =
GenericEntityPreprocessor.class.getDeclaredMethod("isToBeIgnored",
Object.class);
method.setAccessible(true);
boolean result = (boolean) method.invoke(preprocessor, entity);