This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 59648d2 ATLAS-3279: avoid unncessary retrieval of entity-extended
info while sending notifications
59648d2 is described below
commit 59648d289ee8a8c36c1f8e4ba7f3807418a64639
Author: Madhan Neethiraj <[email protected]>
AuthorDate: Sat Jun 15 08:53:00 2019 -0700
ATLAS-3279: avoid unncessary retrieval of entity-extended info while
sending notifications
Change-Id: I82e0bba27010709c74cd98a93f8a9c617577535e
---
.../atlas/repository/graph/FullTextMapperV2.java | 6 +-
.../store/graph/v2/AtlasEntityChangeNotifier.java | 161 +++++++++++----------
2 files changed, 90 insertions(+), 77 deletions(-)
diff --git
a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
index caa6604..0f2b4bf 100644
---
a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
+++
b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java
@@ -271,11 +271,11 @@ public class FullTextMapperV2 {
}
}
- private AtlasEntity getAndCacheEntity(String guid) throws
AtlasBaseException {
+ public AtlasEntity getAndCacheEntity(String guid) throws
AtlasBaseException {
return getAndCacheEntity(guid, true);
}
- private AtlasEntity getAndCacheEntity(String guid, boolean
includeReferences) throws AtlasBaseException {
+ public AtlasEntity getAndCacheEntity(String guid, boolean
includeReferences) throws AtlasBaseException {
RequestContext context = RequestContext.get();
AtlasEntity entity = context.getEntity(guid);
@@ -294,7 +294,7 @@ public class FullTextMapperV2 {
return entity;
}
- private AtlasEntityWithExtInfo getAndCacheEntityWithExtInfo(String guid)
throws AtlasBaseException {
+ public AtlasEntityWithExtInfo getAndCacheEntityWithExtInfo(String guid)
throws AtlasBaseException {
RequestContext context = RequestContext.get();
AtlasEntityWithExtInfo entityWithExtInfo =
context.getEntityWithExtInfo(guid);
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
index 2e47a50..c910d9e 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
@@ -92,7 +92,7 @@ public class AtlasEntityChangeNotifier {
}
public void onEntitiesMutated(EntityMutationResponse
entityMutationResponse, boolean isImport) throws AtlasBaseException {
- if (CollectionUtils.isEmpty(entityChangeListeners) ||
instanceConverter == null) {
+ if (CollectionUtils.isEmpty(entityChangeListeners)) {
return;
}
@@ -118,7 +118,7 @@ public class AtlasEntityChangeNotifier {
}
public void notifyRelationshipMutation(AtlasRelationship relationship,
EntityNotification.EntityNotificationV2.OperationType operationType) throws
AtlasBaseException {
- if (CollectionUtils.isEmpty(entityChangeListeners) ||
instanceConverter == null) {
+ if (CollectionUtils.isEmpty(entityChangeListeners)) {
return;
}
@@ -145,75 +145,76 @@ public class AtlasEntityChangeNotifier {
} else {
updateFullTextMapping(entity.getGuid(), addedClassifications);
- Referenceable entityRef = toReferenceable(entity.getGuid());
- List<Struct> traits = toStruct(addedClassifications);
+ if (instanceConverter != null) {
+ Referenceable entityRef = toReferenceable(entity.getGuid());
+ List<Struct> traits = toStruct(addedClassifications);
- if (entity == null || CollectionUtils.isEmpty(traits)) {
- return;
- }
+ if (entity == null || CollectionUtils.isEmpty(traits)) {
+ return;
+ }
- for (EntityChangeListener listener : entityChangeListeners) {
- try {
- listener.onTraitsAdded(entityRef, traits);
- } catch (AtlasException e) {
- throw new
AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e,
getListenerName(listener), "TraitAdd");
+ for (EntityChangeListener listener : entityChangeListeners) {
+ try {
+ listener.onTraitsAdded(entityRef, traits);
+ } catch (AtlasException e) {
+ throw new
AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e,
getListenerName(listener), "TraitAdd");
+ }
}
}
}
}
public void onClassificationUpdatedToEntity(AtlasEntity entity,
List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
- if (isV2EntityNotificationEnabled) {
- doFullTextMapping(entity.getGuid());
+ doFullTextMapping(entity.getGuid());
+ if (isV2EntityNotificationEnabled) {
for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onClassificationsUpdated(entity,
updatedClassifications);
}
} else {
- doFullTextMapping(entity.getGuid());
+ if (instanceConverter != null) {
+ Referenceable entityRef = toReferenceable(entity.getGuid());
+ List<Struct> traits = toStruct(updatedClassifications);
- Referenceable entityRef = toReferenceable(entity.getGuid());
- List<Struct> traits = toStruct(updatedClassifications);
-
- if (entityRef == null || CollectionUtils.isEmpty(traits)) {
- return;
- }
+ if (entityRef == null || CollectionUtils.isEmpty(traits)) {
+ return;
+ }
- for (EntityChangeListener listener : entityChangeListeners) {
- try {
- listener.onTraitsUpdated(entityRef, traits);
- } catch (AtlasException e) {
- throw new
AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e,
getListenerName(listener), "TraitUpdate");
+ for (EntityChangeListener listener : entityChangeListeners) {
+ try {
+ listener.onTraitsUpdated(entityRef, traits);
+ } catch (AtlasException e) {
+ throw new
AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e,
getListenerName(listener), "TraitUpdate");
+ }
}
}
}
}
public void onClassificationDeletedFromEntity(AtlasEntity entity,
List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
- if (isV2EntityNotificationEnabled) {
- doFullTextMapping(entity.getGuid());
+ doFullTextMapping(entity.getGuid());
+ if (isV2EntityNotificationEnabled) {
for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onClassificationsDeleted(entity,
deletedClassifications);
}
} else {
- doFullTextMapping(entity.getGuid());
-
- Referenceable entityRef = toReferenceable(entity.getGuid());
- List<Struct> traits = toStruct(deletedClassifications);
+ if (instanceConverter != null) {
+ Referenceable entityRef = toReferenceable(entity.getGuid());
+ List<Struct> traits = toStruct(deletedClassifications);
- if (entityRef == null ||
CollectionUtils.isEmpty(deletedClassifications)) {
- return;
- }
+ if (entityRef == null ||
CollectionUtils.isEmpty(deletedClassifications)) {
+ return;
+ }
- for (EntityChangeListener listener : entityChangeListeners) {
- try {
- listener.onTraitsDeleted(entityRef, traits);
- } catch (AtlasException e) {
- throw new
AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e,
getListenerName(listener), "TraitDelete");
+ for (EntityChangeListener listener : entityChangeListeners) {
+ try {
+ listener.onTraitsDeleted(entityRef, traits);
+ } catch (AtlasException e) {
+ throw new
AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e,
getListenerName(listener), "TraitDelete");
+ }
}
}
-
}
}
@@ -223,7 +224,7 @@ public class AtlasEntityChangeNotifier {
for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onTermAdded(term, entityIds);
}
- } else {
+ } else if (instanceConverter != null) {
List<Referenceable> entityRefs = toReferenceables(entityIds);
for (EntityChangeListener listener : entityChangeListeners) {
@@ -242,7 +243,7 @@ public class AtlasEntityChangeNotifier {
for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
listener.onTermDeleted(term, entityIds);
}
- } else {
+ } else if (instanceConverter != null) {
List<Referenceable> entityRefs = toReferenceables(entityIds);
for (EntityChangeListener listener : entityChangeListeners) {
@@ -277,7 +278,7 @@ public class AtlasEntityChangeNotifier {
continue;
}
- AtlasEntity entity = instanceConverter.getAndCacheEntity(guid);
+ AtlasEntity entity = fullTextMapperV2.getAndCacheEntity(guid);
if (entity == null) {
continue;
@@ -300,11 +301,15 @@ public class AtlasEntityChangeNotifier {
return;
}
+ MetricRecorder metric =
RequestContext.get().startMetricRecord("notifyListeners");
+
if (isV2EntityNotificationEnabled) {
notifyV2Listeners(entityHeaders, operation, isImport);
} else {
notifyV1Listeners(entityHeaders, operation, isImport);
}
+
+ RequestContext.get().endMetricRecord(metric);
}
private void notifyRelationshipListeners(List<AtlasRelationship>
relationships, EntityOperation operation, boolean isImport) throws
AtlasBaseException {
@@ -322,24 +327,26 @@ public class AtlasEntityChangeNotifier {
private void notifyV1Listeners(List<AtlasEntityHeader> entityHeaders,
EntityOperation operation, boolean isImport) throws AtlasBaseException {
- List<Referenceable> typedRefInsts = toReferenceables(entityHeaders,
operation);
+ if (instanceConverter != null) {
+ List<Referenceable> typedRefInsts =
toReferenceables(entityHeaders, operation);
- for (EntityChangeListener listener : entityChangeListeners) {
- try {
- switch (operation) {
- case CREATE:
- listener.onEntitiesAdded(typedRefInsts, isImport);
- break;
- case UPDATE:
- case PARTIAL_UPDATE:
- listener.onEntitiesUpdated(typedRefInsts, isImport);
- break;
- case DELETE:
- listener.onEntitiesDeleted(typedRefInsts, isImport);
- break;
+ for (EntityChangeListener listener : entityChangeListeners) {
+ try {
+ switch (operation) {
+ case CREATE:
+ listener.onEntitiesAdded(typedRefInsts, isImport);
+ break;
+ case UPDATE:
+ case PARTIAL_UPDATE:
+ listener.onEntitiesUpdated(typedRefInsts,
isImport);
+ break;
+ case DELETE:
+ listener.onEntitiesDeleted(typedRefInsts,
isImport);
+ break;
+ }
+ } catch (AtlasException e) {
+ throw new
AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e,
getListenerName(listener), operation.toString());
}
- } catch (AtlasException e) {
- throw new
AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e,
getListenerName(listener), operation.toString());
}
}
}
@@ -383,17 +390,19 @@ public class AtlasEntityChangeNotifier {
}
}
- private List<Referenceable> toReferenceables(List<AtlasEntityHeader>
entityHeaders, EntityOperation operation) throws AtlasBaseException {
+ private List<Referenceable> toReferenceables(List<AtlasEntityHeader>
entityHeaders, EntityOperation operation) throws AtlasBaseException {
List<Referenceable> ret = new ArrayList<>(entityHeaders.size());
- // delete notifications don't need all attributes. Hence the special
handling for delete operation
- if (operation == EntityOperation.DELETE) {
- for (AtlasEntityHeader entityHeader : entityHeaders) {
- ret.add(new Referenceable(entityHeader.getGuid(),
entityHeader.getTypeName(), entityHeader.getAttributes()));
- }
- } else {
- for (AtlasEntityHeader entityHeader : entityHeaders) {
- ret.add(toReferenceable(entityHeader.getGuid()));
+ if (instanceConverter != null) {
+ // delete notifications don't need all attributes. Hence the
special handling for delete operation
+ if (operation == EntityOperation.DELETE) {
+ for (AtlasEntityHeader entityHeader : entityHeaders) {
+ ret.add(new Referenceable(entityHeader.getGuid(),
entityHeader.getTypeName(), entityHeader.getAttributes()));
+ }
+ } else {
+ for (AtlasEntityHeader entityHeader : entityHeaders) {
+ ret.add(toReferenceable(entityHeader.getGuid()));
+ }
}
}
@@ -403,7 +412,7 @@ public class AtlasEntityChangeNotifier {
private List<Referenceable> toReferenceables(List<AtlasRelatedObjectId>
entityIds) throws AtlasBaseException {
List<Referenceable> ret = new ArrayList<>();
- if (CollectionUtils.isNotEmpty(entityIds)) {
+ if (instanceConverter != null &&
CollectionUtils.isNotEmpty(entityIds)) {
for (AtlasRelatedObjectId relatedObjectId : entityIds) {
String entityGuid = relatedObjectId.getGuid();
@@ -417,17 +426,17 @@ public class AtlasEntityChangeNotifier {
private Referenceable toReferenceable(String entityId) throws
AtlasBaseException {
Referenceable ret = null;
- if (StringUtils.isNotEmpty(entityId)) {
+ if (instanceConverter != null && StringUtils.isNotEmpty(entityId)) {
ret = instanceConverter.getReferenceable(entityId);
}
return ret;
}
- private List<Struct> toStruct(List<AtlasClassification>
classifications) throws AtlasBaseException {
+ private List<Struct> toStruct(List<AtlasClassification> classifications)
throws AtlasBaseException {
List<Struct> ret = null;
- if (classifications != null) {
+ if (instanceConverter != null && classifications != null) {
ret = new ArrayList<>(classifications.size());
for (AtlasClassification classification : classifications) {
@@ -468,7 +477,7 @@ public class AtlasEntityChangeNotifier {
} else {
String entityGuid = entityHeader.getGuid();
- entity = instanceConverter.getAndCacheEntity(entityGuid);
+ entity = fullTextMapperV2.getAndCacheEntity(entityGuid);
}
if (entity != null) {
@@ -545,6 +554,10 @@ public class AtlasEntityChangeNotifier {
}
private void doFullTextMapping(String guid) {
+ if(AtlasRepositoryConfiguration.isFreeTextSearchEnabled() ||
!AtlasRepositoryConfiguration.isFullTextSearchEnabled()) {
+ return;
+ }
+
AtlasEntityHeader entityHeader = new AtlasEntityHeader();
entityHeader.setGuid(guid);