Repository: atlas
Updated Branches:
  refs/heads/master d8b868339 -> 6fb2a0388


ATLAS-1995: updated entity-lookup-by-unique-attributes to use indexQuery

Signed-off-by: Madhan Neethiraj <mad...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/6fb2a038
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/6fb2a038
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/6fb2a038

Branch: refs/heads/master
Commit: 6fb2a0388a89369ceec57a74954d129df600e163
Parents: d8b8683
Author: ashutoshm <ames...@hortonworks.com>
Authored: Thu Jul 27 15:10:36 2017 -0700
Committer: Madhan Neethiraj <mad...@apache.org>
Committed: Fri Jul 28 18:18:19 2017 -0700

----------------------------------------------------------------------
 .../graph/GraphBackedMetadataRepository.java    |   1 -
 .../store/graph/AtlasTypeDefGraphStore.java     |   6 +
 .../store/graph/v1/AtlasEntityStoreV1.java      |   4 +-
 .../store/graph/v1/AtlasGraphUtilsV1.java       | 106 ++++++++++-
 .../GraphBackedMetadataRepositoryTest.java      |   3 +-
 .../graph/v1/AtlasDeleteHandlerV1Test.java      |   7 +-
 .../graph/v1/AtlasRelationshipStoreV1Test.java  |  10 +-
 .../notification/NotificationHookConsumer.java  | 175 ++++++++++---------
 8 files changed, 221 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
 
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
index 0f3b06b..50b7116 100755
--- 
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
@@ -168,7 +168,6 @@ public class GraphBackedMetadataRepository implements 
MetadataRepository {
     }
 
     @Override
-    @GraphTransaction
     public ITypedReferenceableInstance getEntityDefinition(String guid) throws 
RepositoryException, EntityNotFoundException {
         return getEntityDefinitions(guid).get(0);
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
index 517da68..3638e19 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
@@ -378,6 +378,12 @@ public abstract class AtlasTypeDefGraphStore implements 
AtlasTypeDefStore, Activ
             }
         }
 
+        try {
+            ttr.updateTypes(ret);
+        } catch (AtlasBaseException e) { // this shouldn't happen, as the 
types were already validated
+            LOG.error("failed to update the registry after updating the 
store", e);
+        }
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("<== AtlasTypeDefGraphStore.createUpdateTypesDef({}, 
{}): {}", typesToCreate, typesToUpdate, ret);
         }

http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index f340330..1c168b4 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -307,7 +307,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore 
{
 
         entity.setGuid(guid);
 
-        return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), true);
+        return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), true, 
false);
     }
 
     @Override
@@ -358,7 +358,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore 
{
                 throw new 
AtlasBaseException(AtlasErrorCode.ATTRIBUTE_UPDATE_NOT_SUPPORTED, attrName, 
attrType.getTypeName());
         }
 
-        return createOrUpdate(new AtlasEntityStream(updateEntity), true);
+        return createOrUpdate(new AtlasEntityStream(updateEntity), true, 
false);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
index 948d9dd..227f7cd 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
@@ -18,7 +18,9 @@
 package org.apache.atlas.repository.store.graph.v1;
 
 
+import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.discovery.SearchProcessor;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasEntity;
@@ -29,12 +31,14 @@ import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasElement;
 import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
+import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasStructType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
 import org.apache.atlas.type.AtlasType;
 import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,6 +47,7 @@ import java.util.Collection;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Utility methods for Graph.
@@ -55,6 +60,19 @@ public class AtlasGraphUtilsV1 {
     public static final String VERTEX_TYPE          = "typeSystem";
     public static final String RELATIONSHIPTYPE_EDGE_LABEL = PROPERTY_PREFIX + 
".relationshipType";
 
+    private static boolean USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES 
= false;
+
+    static {
+        try {
+            Configuration conf = ApplicationProperties.get();
+
+            USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES = 
conf.getBoolean("atlas.use.index.query.to.find.entity.by.unique.attributes", 
USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES);
+        } catch (Exception excp) {
+            LOG.error("Error reading configuration", excp);
+        } finally {
+            
LOG.info("atlas.use.index.query.to.find.entity.by.unique.attributes=" + 
USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES);
+        }
+    }
 
     public static String getTypeDefPropertyKey(AtlasBaseTypeDef typeDef) {
         return getTypeDefPropertyKey(typeDef.getName());
@@ -217,13 +235,22 @@ public class AtlasGraphUtilsV1 {
                     continue;
                 }
 
-                vertex = 
AtlasGraphUtilsV1.findByTypeAndPropertyName(entityType.getTypeName(), 
attribute.getVertexPropertyName(), attrValue);
+                if (canUseIndexQuery(entityType, attribute.getName())) {
+                    vertex = 
AtlasGraphUtilsV1.getAtlasVertexFromIndexQuery(entityType, attribute, 
attrValue);
+                } else {
+                    vertex = 
AtlasGraphUtilsV1.findByTypeAndPropertyName(entityType.getTypeName(), 
attribute.getVertexPropertyName(), attrValue);
 
-                if (vertex == null) {
-                    vertex = 
AtlasGraphUtilsV1.findBySuperTypeAndPropertyName(entityType.getTypeName(), 
attribute.getVertexPropertyName(), attrValue);
+                    if (vertex == null) {
+                        vertex = 
AtlasGraphUtilsV1.findBySuperTypeAndPropertyName(entityType.getTypeName(), 
attribute.getVertexPropertyName(), attrValue);
+                    }
                 }
 
                 if (vertex != null) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("findByUniqueAttributes(type={}, 
attrName={}, attrValue={}: found vertex {}",
+                                  entityType.getTypeName(), 
attribute.getName(), attrValue, vertex);
+                    }
+
                     break;
                 }
             }
@@ -366,4 +393,77 @@ public class AtlasGraphUtilsV1 {
     public static String getStateAsString(AtlasElement element) {
         return element.getProperty(Constants.STATE_PROPERTY_KEY, String.class);
     }
+
+    private static boolean canUseIndexQuery(AtlasEntityType entityType, String 
attributeName) {
+        boolean ret = false;
+
+        if (USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES) {
+            final String typeAndSubTypesQryStr = 
entityType.getTypeAndAllSubTypesQryStr();
+
+            ret = typeAndSubTypesQryStr.length() <= 
SearchProcessor.MAX_QUERY_STR_LENGTH_TYPES;
+
+            if (ret) {
+                Set<String> indexSet = 
AtlasGraphProvider.getGraphInstance().getVertexIndexKeys();
+                try {
+                    ret = 
indexSet.contains(entityType.getQualifiedAttributeName(attributeName));
+                }
+                catch (AtlasBaseException ex) {
+                    ret = false;
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    private static AtlasVertex getAtlasVertexFromIndexQuery(AtlasEntityType 
entityType, AtlasAttribute attribute, Object attrVal) {
+        String          propertyName = attribute.getVertexPropertyName();
+        AtlasIndexQuery query        = getIndexQuery(entityType, propertyName, 
attrVal.toString());
+
+        for (Iterator<AtlasIndexQuery.Result> iter = query.vertices(); 
iter.hasNext(); ) {
+            AtlasIndexQuery.Result result = iter.next();
+            AtlasVertex            vertex = result.getVertex();
+
+            // skip non-entity vertices, if any got returned
+            if (vertex == null || 
!vertex.getPropertyKeys().contains(Constants.GUID_PROPERTY_KEY)) {
+                continue;
+            }
+
+            // verify the typeName
+            String typeNameInVertex = getTypeName(vertex);
+
+            if 
(!entityType.getTypeAndAllSubTypes().contains(typeNameInVertex)) {
+                LOG.warn("incorrect vertex type from index-query: 
expected='{}'; found='{}'", entityType.getTypeName(), typeNameInVertex);
+
+                continue;
+            }
+
+            if (attrVal.getClass() == String.class) {
+                String s         = (String) attrVal;
+                String vertexVal = vertex.getProperty(propertyName, 
String.class);
+
+                if (!s.equalsIgnoreCase(vertexVal)) {
+                    LOG.warn("incorrect match from index-query for property 
{}: expected='{}'; found='{}'", propertyName, s, vertexVal);
+
+                    continue;
+                }
+            }
+
+            return vertex;
+        }
+
+        return null;
+    }
+
+    private static AtlasIndexQuery getIndexQuery(AtlasEntityType entityType, 
String propertyName, String value) {
+        StringBuilder sb = new StringBuilder();
+
+        
sb.append("v.\"").append(Constants.TYPE_NAME_PROPERTY_KEY).append("\":").append(entityType.getTypeAndAllSubTypesQryStr())
+                .append(" AND ")
+                
.append("v.\"").append(propertyName).append("\":").append(AtlasAttribute.escapeIndexQueryValue(value))
+                .append(" AND ")
+                
.append("v.\"").append(Constants.STATE_PROPERTY_KEY).append("\":ACTIVE");
+
+        return 
AtlasGraphProvider.getGraphInstance().indexQuery(Constants.VERTEX_INDEX, 
sb.toString());
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
----------------------------------------------------------------------
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
index 8120aaa..f372891 100755
--- 
a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
@@ -474,7 +474,6 @@ public class GraphBackedMetadataRepositoryTest {
         return guid;
     }
 
-    @GraphTransaction
     AtlasVertex getTableEntityVertex() {
         AtlasGraph graph = TestUtils.getGraph();
         AtlasGraphQuery query = 
graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, 
ComparisionOperator.EQUAL, TestUtils.TABLE_TYPE);
@@ -651,6 +650,7 @@ public class GraphBackedMetadataRepositoryTest {
     }
 
     @Test(dependsOnMethods = "testCreateEntity")
+    @GraphTransaction
     public void testGetIdFromVertex() throws Exception {
         AtlasVertex tableVertex = getTableEntityVertex();
 
@@ -664,6 +664,7 @@ public class GraphBackedMetadataRepositoryTest {
     }
 
     @Test(dependsOnMethods = "testCreateEntity")
+    @GraphTransaction
     public void testGetTypeName() throws Exception {
         AtlasVertex tableVertex = getTableEntityVertex();
         Assert.assertEquals(GraphHelper.getTypeName(tableVertex), 
TestUtils.TABLE_TYPE);

http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
----------------------------------------------------------------------
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
 
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
index 9331e35..62ef21c 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
@@ -41,6 +41,7 @@ import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
+import 
org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.services.MetadataService;
 import org.apache.atlas.store.AtlasTypeDefStore;
@@ -129,7 +130,11 @@ public abstract class AtlasDeleteHandlerV1Test {
             ImmutableList.<AtlasClassificationDef>of(),
             ImmutableList.of(mapValueDef, mapOwnerDef));
 
-        typeDefStore.createTypesDef(typesDef);
+        AtlasTypesDef typesToCreate = 
AtlasTypeDefStoreInitializer.getTypesToCreate(typesDef, typeRegistry);
+
+        if (!typesToCreate.isEmpty()) {
+            typeDefStore.createTypesDef(typesToCreate);
+        }
 
         compositeMapOwnerType = 
typeRegistry.getEntityTypeByName("CompositeMapOwner");
         compositeMapValueType = 
typeRegistry.getEntityTypeByName("CompositeMapValue");

http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java
----------------------------------------------------------------------
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java
 
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java
index 3ebda0d..263ad5b 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java
@@ -30,6 +30,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
+import 
org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
 import org.apache.atlas.store.AtlasTypeDefStore;
@@ -97,8 +98,13 @@ public abstract class AtlasRelationshipStoreV1Test {
         }
 
         init();
-        AtlasTypesDef testTypes = getInverseReferenceTestTypes();
-        typeDefStore.createTypesDef(testTypes);
+        AtlasTypesDef typesDef = getInverseReferenceTestTypes();
+
+        AtlasTypesDef typesToCreate = 
AtlasTypeDefStoreInitializer.getTypesToCreate(typesDef, typeRegistry);
+
+        if (!typesToCreate.isEmpty()) {
+            typeDefStore.createTypesDef(typesToCreate);
+        }
     }
 
     @BeforeTest

http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 51276d3..b8255b3 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -42,6 +42,7 @@ import org.apache.atlas.service.Service;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.utils.AtlasPerfTracer;
 import org.apache.atlas.web.filters.AuditFilter;
 import org.apache.atlas.web.service.ServiceState;
 import org.apache.atlas.web.util.DateTimeHelper;
@@ -71,6 +72,7 @@ import static org.apache.atlas.AtlasClientV2.*;
 @Order(4)
 public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandler {
     private static final Logger LOG = 
LoggerFactory.getLogger(NotificationHookConsumer.class);
+    private static final Logger PERF_LOG = 
AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class);
     private static final String LOCALHOST = "localhost";
     private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
 
@@ -236,113 +238,124 @@ public class NotificationHookConsumer implements 
Service, ActiveStateChangeHandl
 
         @VisibleForTesting
         void handleMessage(AtlasKafkaMessage<HookNotificationMessage> 
kafkaMsg) throws AtlasServiceException, AtlasException {
+            AtlasPerfTracer perf = null;
+
             HookNotificationMessage message = kafkaMsg.getMessage();
             String messageUser = message.getUser();
-            // Used for intermediate conversions during create and update
-            AtlasEntity.AtlasEntitiesWithExtInfo entities;
-            for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("handleMessage({}): attempt {}", 
message.getType().name(), numRetries);
-                }
-                try {
-                    RequestContext requestContext = 
RequestContext.createContext();
-                    requestContext.setUser(messageUser);
 
-                    switch (message.getType()) {
-                        case ENTITY_CREATE:
-                            EntityCreateRequest createRequest = 
(EntityCreateRequest) message;
+            if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, 
message.getType().name());
+            }
 
-                            if (numRetries == 0) { // audit only on the first 
attempt
-                                audit(messageUser, CREATE_ENTITY.getMethod(), 
CREATE_ENTITY.getPath());
-                            }
+            try {
+                // Used for intermediate conversions during create and update
+                AtlasEntity.AtlasEntitiesWithExtInfo entities;
+                for (int numRetries = 0; numRetries < maxRetries; 
numRetries++) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("handleMessage({}): attempt {}", 
message.getType().name(), numRetries);
+                    }
+                    try {
+                        RequestContext requestContext = 
RequestContext.createContext();
+                        requestContext.setUser(messageUser);
 
-                            entities = 
instanceConverter.toAtlasEntities(createRequest.getEntities());
+                        switch (message.getType()) {
+                            case ENTITY_CREATE:
+                                EntityCreateRequest createRequest = 
(EntityCreateRequest) message;
 
-                            atlasEntityStore.createOrUpdate(new 
AtlasEntityStream(entities), false);
-                            break;
+                                if (numRetries == 0) { // audit only on the 
first attempt
+                                    audit(messageUser, 
CREATE_ENTITY.getMethod(), CREATE_ENTITY.getPath());
+                                }
 
-                        case ENTITY_PARTIAL_UPDATE:
-                            final EntityPartialUpdateRequest 
partialUpdateRequest = (EntityPartialUpdateRequest) message;
+                                entities = 
instanceConverter.toAtlasEntities(createRequest.getEntities());
 
-                            if (numRetries == 0) { // audit only on the first 
attempt
-                                audit(messageUser, 
UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(),
-                                        
String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), 
partialUpdateRequest.getTypeName()));
-                            }
+                                atlasEntityStore.createOrUpdate(new 
AtlasEntityStream(entities), false);
+                                break;
 
-                            Referenceable referenceable = 
partialUpdateRequest.getEntity();
-                            entities = 
instanceConverter.toAtlasEntity(referenceable);
+                            case ENTITY_PARTIAL_UPDATE:
+                                final EntityPartialUpdateRequest 
partialUpdateRequest = (EntityPartialUpdateRequest) message;
 
-                            AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
-                            String guid = 
AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, 
Object>() {
-                                {
-                                    put(partialUpdateRequest.getAttribute(), 
partialUpdateRequest.getAttributeValue());
+                                if (numRetries == 0) { // audit only on the 
first attempt
+                                    audit(messageUser, 
UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(),
+                                            
String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), 
partialUpdateRequest.getTypeName()));
                                 }
-                            });
 
-                            // There should only be one root entity
-                            entities.getEntities().get(0).setGuid(guid);
+                                Referenceable referenceable = 
partialUpdateRequest.getEntity();
+                                entities = 
instanceConverter.toAtlasEntity(referenceable);
 
-                            atlasEntityStore.createOrUpdate(new 
AtlasEntityStream(entities), true);
-                            break;
+                                AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
+                                String guid = 
AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, 
Object>() {
+                                    {
+                                        
put(partialUpdateRequest.getAttribute(), 
partialUpdateRequest.getAttributeValue());
+                                    }
+                                });
 
-                        case ENTITY_DELETE:
-                            final EntityDeleteRequest deleteRequest = 
(EntityDeleteRequest) message;
+                                // There should only be one root entity
+                                entities.getEntities().get(0).setGuid(guid);
 
-                            if (numRetries == 0) { // audit only on the first 
attempt
-                                audit(messageUser, 
DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
-                                        
String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), 
deleteRequest.getTypeName()));
-                            }
+                                atlasEntityStore.createOrUpdate(new 
AtlasEntityStream(entities), true);
+                                break;
 
-                            try {
-                                AtlasEntityType type = (AtlasEntityType) 
typeRegistry.getType(deleteRequest.getTypeName());
-                                atlasEntityStore.deleteByUniqueAttributes(type,
-                                        new HashMap<String, Object>() {{
-                                            put(deleteRequest.getAttribute(), 
deleteRequest.getAttributeValue());
-                                        }});
-                            } catch (ClassCastException cle) {
-                                LOG.error("Failed to do a partial update on 
Entity");
-                            }
-                            break;
+                            case ENTITY_DELETE:
+                                final EntityDeleteRequest deleteRequest = 
(EntityDeleteRequest) message;
 
-                        case ENTITY_FULL_UPDATE:
-                            EntityUpdateRequest updateRequest = 
(EntityUpdateRequest) message;
+                                if (numRetries == 0) { // audit only on the 
first attempt
+                                    audit(messageUser, 
DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
+                                            
String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), 
deleteRequest.getTypeName()));
+                                }
 
-                            if (numRetries == 0) { // audit only on the first 
attempt
-                                audit(messageUser, UPDATE_ENTITY.getMethod(), 
UPDATE_ENTITY.getPath());
-                            }
+                                try {
+                                    AtlasEntityType type = (AtlasEntityType) 
typeRegistry.getType(deleteRequest.getTypeName());
+                                    
atlasEntityStore.deleteByUniqueAttributes(type,
+                                            new HashMap<String, Object>() {{
+                                                
put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue());
+                                            }});
+                                } catch (ClassCastException cle) {
+                                    LOG.error("Failed to do a partial update 
on Entity");
+                                }
+                                break;
 
-                            entities = 
instanceConverter.toAtlasEntities(updateRequest.getEntities());
-                            atlasEntityStore.createOrUpdate(new 
AtlasEntityStream(entities), false);
-                            break;
+                            case ENTITY_FULL_UPDATE:
+                                EntityUpdateRequest updateRequest = 
(EntityUpdateRequest) message;
 
-                        default:
-                            throw new IllegalStateException("Unknown 
notification type: " + message.getType().name());
-                    }
+                                if (numRetries == 0) { // audit only on the 
first attempt
+                                    audit(messageUser, 
UPDATE_ENTITY.getMethod(), UPDATE_ENTITY.getPath());
+                                }
 
-                    break;
-                } catch (Throwable e) {
-                    LOG.warn("Error handling message", e);
-                    try {
-                        LOG.info("Sleeping for {} ms before retry", 
consumerRetryInterval);
-                        Thread.sleep(consumerRetryInterval);
-                    } catch (InterruptedException ie) {
-                        LOG.error("Notification consumer thread sleep 
interrupted");
-                    }
+                                entities = 
instanceConverter.toAtlasEntities(updateRequest.getEntities());
+                                atlasEntityStore.createOrUpdate(new 
AtlasEntityStream(entities), false);
+                                break;
+
+                            default:
+                                throw new IllegalStateException("Unknown 
notification type: " + message.getType().name());
+                        }
 
-                    if (numRetries == (maxRetries - 1)) {
-                        LOG.warn("Max retries exceeded for message {}", 
message, e);
-                        failedMessages.add(message);
-                        if (failedMessages.size() >= failedMsgCacheSize) {
-                            recordFailedMessages();
+                        break;
+                    } catch (Throwable e) {
+                        LOG.warn("Error handling message", e);
+                        try {
+                            LOG.info("Sleeping for {} ms before retry", 
consumerRetryInterval);
+                            Thread.sleep(consumerRetryInterval);
+                        } catch (InterruptedException ie) {
+                            LOG.error("Notification consumer thread sleep 
interrupted");
+                        }
+
+                        if (numRetries == (maxRetries - 1)) {
+                            LOG.warn("Max retries exceeded for message {}", 
message, e);
+                            failedMessages.add(message);
+                            if (failedMessages.size() >= failedMsgCacheSize) {
+                                recordFailedMessages();
+                            }
+                            return;
                         }
-                        return;
+                    } finally {
+                        RequestContext.clear();
+                        RequestContextV1.clear();
                     }
-                } finally {
-                    RequestContext.clear();
-                    RequestContextV1.clear();
                 }
+                commit(kafkaMsg);
+            } finally {
+                AtlasPerfTracer.log(perf);
             }
-            commit(kafkaMsg);
         }
 
         private void recordFailedMessages() {

Reply via email to