Repository: atlas Updated Branches: refs/heads/master d8b868339 -> 6fb2a0388
ATLAS-1995: updated entity-lookup-by-unique-attributes to use indexQuery Signed-off-by: Madhan Neethiraj <mad...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/6fb2a038 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/6fb2a038 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/6fb2a038 Branch: refs/heads/master Commit: 6fb2a0388a89369ceec57a74954d129df600e163 Parents: d8b8683 Author: ashutoshm <ames...@hortonworks.com> Authored: Thu Jul 27 15:10:36 2017 -0700 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Fri Jul 28 18:18:19 2017 -0700 ---------------------------------------------------------------------- .../graph/GraphBackedMetadataRepository.java | 1 - .../store/graph/AtlasTypeDefGraphStore.java | 6 + .../store/graph/v1/AtlasEntityStoreV1.java | 4 +- .../store/graph/v1/AtlasGraphUtilsV1.java | 106 ++++++++++- .../GraphBackedMetadataRepositoryTest.java | 3 +- .../graph/v1/AtlasDeleteHandlerV1Test.java | 7 +- .../graph/v1/AtlasRelationshipStoreV1Test.java | 10 +- .../notification/NotificationHookConsumer.java | 175 ++++++++++--------- 8 files changed, 221 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java index 0f3b06b..50b7116 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java @@ -168,7 +168,6 @@ public class GraphBackedMetadataRepository implements MetadataRepository { } @Override - @GraphTransaction public ITypedReferenceableInstance getEntityDefinition(String guid) throws RepositoryException, EntityNotFoundException { return getEntityDefinitions(guid).get(0); } http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java index 517da68..3638e19 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java @@ -378,6 +378,12 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ } } + try { + ttr.updateTypes(ret); + } catch (AtlasBaseException e) { // this shouldn't happen, as the types were already validated + LOG.error("failed to update the registry after updating the store", e); + } + if (LOG.isDebugEnabled()) { LOG.debug("<== AtlasTypeDefGraphStore.createUpdateTypesDef({}, {}): {}", typesToCreate, typesToUpdate, ret); } http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java index f340330..1c168b4 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java @@ -307,7 +307,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { entity.setGuid(guid); - return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), true); + return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), true, false); } @Override @@ -358,7 +358,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { throw new AtlasBaseException(AtlasErrorCode.ATTRIBUTE_UPDATE_NOT_SUPPORTED, attrName, attrType.getTypeName()); } - return createOrUpdate(new AtlasEntityStream(updateEntity), true); + return createOrUpdate(new AtlasEntityStream(updateEntity), true, false); } @Override http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java index 948d9dd..227f7cd 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java @@ -18,7 +18,9 @@ package org.apache.atlas.repository.store.graph.v1; +import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.discovery.SearchProcessor; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasEntity; @@ -29,12 +31,14 @@ import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasElement; import org.apache.atlas.repository.graphdb.AtlasGraphQuery; +import org.apache.atlas.repository.graphdb.AtlasIndexQuery; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasStructType; import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.type.AtlasType; import org.apache.commons.collections.MapUtils; +import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +47,7 @@ import java.util.Collection; import java.util.Date; import java.util.Iterator; import java.util.Map; +import java.util.Set; /** * Utility methods for Graph. @@ -55,6 +60,19 @@ public class AtlasGraphUtilsV1 { public static final String VERTEX_TYPE = "typeSystem"; public static final String RELATIONSHIPTYPE_EDGE_LABEL = PROPERTY_PREFIX + ".relationshipType"; + private static boolean USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES = false; + + static { + try { + Configuration conf = ApplicationProperties.get(); + + USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES = conf.getBoolean("atlas.use.index.query.to.find.entity.by.unique.attributes", USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES); + } catch (Exception excp) { + LOG.error("Error reading configuration", excp); + } finally { + LOG.info("atlas.use.index.query.to.find.entity.by.unique.attributes=" + USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES); + } + } public static String getTypeDefPropertyKey(AtlasBaseTypeDef typeDef) { return getTypeDefPropertyKey(typeDef.getName()); @@ -217,13 +235,22 @@ public class AtlasGraphUtilsV1 { continue; } - vertex = AtlasGraphUtilsV1.findByTypeAndPropertyName(entityType.getTypeName(), attribute.getVertexPropertyName(), attrValue); + if (canUseIndexQuery(entityType, attribute.getName())) { + vertex = AtlasGraphUtilsV1.getAtlasVertexFromIndexQuery(entityType, attribute, attrValue); + } else { + vertex = AtlasGraphUtilsV1.findByTypeAndPropertyName(entityType.getTypeName(), attribute.getVertexPropertyName(), attrValue); - if (vertex == null) { - vertex = AtlasGraphUtilsV1.findBySuperTypeAndPropertyName(entityType.getTypeName(), attribute.getVertexPropertyName(), attrValue); + if (vertex == null) { + vertex = AtlasGraphUtilsV1.findBySuperTypeAndPropertyName(entityType.getTypeName(), attribute.getVertexPropertyName(), attrValue); + } } if (vertex != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("findByUniqueAttributes(type={}, attrName={}, attrValue={}: found vertex {}", + entityType.getTypeName(), attribute.getName(), attrValue, vertex); + } + break; } } @@ -366,4 +393,77 @@ public class AtlasGraphUtilsV1 { public static String getStateAsString(AtlasElement element) { return element.getProperty(Constants.STATE_PROPERTY_KEY, String.class); } + + private static boolean canUseIndexQuery(AtlasEntityType entityType, String attributeName) { + boolean ret = false; + + if (USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES) { + final String typeAndSubTypesQryStr = entityType.getTypeAndAllSubTypesQryStr(); + + ret = typeAndSubTypesQryStr.length() <= SearchProcessor.MAX_QUERY_STR_LENGTH_TYPES; + + if (ret) { + Set<String> indexSet = AtlasGraphProvider.getGraphInstance().getVertexIndexKeys(); + try { + ret = indexSet.contains(entityType.getQualifiedAttributeName(attributeName)); + } + catch (AtlasBaseException ex) { + ret = false; + } + } + } + + return ret; + } + + private static AtlasVertex getAtlasVertexFromIndexQuery(AtlasEntityType entityType, AtlasAttribute attribute, Object attrVal) { + String propertyName = attribute.getVertexPropertyName(); + AtlasIndexQuery query = getIndexQuery(entityType, propertyName, attrVal.toString()); + + for (Iterator<AtlasIndexQuery.Result> iter = query.vertices(); iter.hasNext(); ) { + AtlasIndexQuery.Result result = iter.next(); + AtlasVertex vertex = result.getVertex(); + + // skip non-entity vertices, if any got returned + if (vertex == null || !vertex.getPropertyKeys().contains(Constants.GUID_PROPERTY_KEY)) { + continue; + } + + // verify the typeName + String typeNameInVertex = getTypeName(vertex); + + if (!entityType.getTypeAndAllSubTypes().contains(typeNameInVertex)) { + LOG.warn("incorrect vertex type from index-query: expected='{}'; found='{}'", entityType.getTypeName(), typeNameInVertex); + + continue; + } + + if (attrVal.getClass() == String.class) { + String s = (String) attrVal; + String vertexVal = vertex.getProperty(propertyName, String.class); + + if (!s.equalsIgnoreCase(vertexVal)) { + LOG.warn("incorrect match from index-query for property {}: expected='{}'; found='{}'", propertyName, s, vertexVal); + + continue; + } + } + + return vertex; + } + + return null; + } + + private static AtlasIndexQuery getIndexQuery(AtlasEntityType entityType, String propertyName, String value) { + StringBuilder sb = new StringBuilder(); + + sb.append("v.\"").append(Constants.TYPE_NAME_PROPERTY_KEY).append("\":").append(entityType.getTypeAndAllSubTypesQryStr()) + .append(" AND ") + .append("v.\"").append(propertyName).append("\":").append(AtlasAttribute.escapeIndexQueryValue(value)) + .append(" AND ") + .append("v.\"").append(Constants.STATE_PROPERTY_KEY).append("\":ACTIVE"); + + return AtlasGraphProvider.getGraphInstance().indexQuery(Constants.VERTEX_INDEX, sb.toString()); + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java index 8120aaa..f372891 100755 --- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java @@ -474,7 +474,6 @@ public class GraphBackedMetadataRepositoryTest { return guid; } - @GraphTransaction AtlasVertex getTableEntityVertex() { AtlasGraph graph = TestUtils.getGraph(); AtlasGraphQuery query = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, ComparisionOperator.EQUAL, TestUtils.TABLE_TYPE); @@ -651,6 +650,7 @@ public class GraphBackedMetadataRepositoryTest { } @Test(dependsOnMethods = "testCreateEntity") + @GraphTransaction public void testGetIdFromVertex() throws Exception { AtlasVertex tableVertex = getTableEntityVertex(); @@ -664,6 +664,7 @@ public class GraphBackedMetadataRepositoryTest { } @Test(dependsOnMethods = "testCreateEntity") + @GraphTransaction public void testGetTypeName() throws Exception { AtlasVertex tableVertex = getTableEntityVertex(); Assert.assertEquals(GraphHelper.getTypeName(tableVertex), TestUtils.TABLE_TYPE); http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java index 9331e35..62ef21c 100644 --- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java +++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java @@ -41,6 +41,7 @@ import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.services.MetadataService; import org.apache.atlas.store.AtlasTypeDefStore; @@ -129,7 +130,11 @@ public abstract class AtlasDeleteHandlerV1Test { ImmutableList.<AtlasClassificationDef>of(), ImmutableList.of(mapValueDef, mapOwnerDef)); - typeDefStore.createTypesDef(typesDef); + AtlasTypesDef typesToCreate = AtlasTypeDefStoreInitializer.getTypesToCreate(typesDef, typeRegistry); + + if (!typesToCreate.isEmpty()) { + typeDefStore.createTypesDef(typesToCreate); + } compositeMapOwnerType = typeRegistry.getEntityTypeByName("CompositeMapOwner"); compositeMapValueType = typeRegistry.getEntityTypeByName("CompositeMapValue"); http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java index 3ebda0d..263ad5b 100644 --- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java +++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java @@ -30,6 +30,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; +import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; import org.apache.atlas.store.AtlasTypeDefStore; @@ -97,8 +98,13 @@ public abstract class AtlasRelationshipStoreV1Test { } init(); - AtlasTypesDef testTypes = getInverseReferenceTestTypes(); - typeDefStore.createTypesDef(testTypes); + AtlasTypesDef typesDef = getInverseReferenceTestTypes(); + + AtlasTypesDef typesToCreate = AtlasTypeDefStoreInitializer.getTypesToCreate(typesDef, typeRegistry); + + if (!typesToCreate.isEmpty()) { + typeDefStore.createTypesDef(typesToCreate); + } } @BeforeTest http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 51276d3..b8255b3 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -42,6 +42,7 @@ import org.apache.atlas.service.Service; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.atlas.web.filters.AuditFilter; import org.apache.atlas.web.service.ServiceState; import org.apache.atlas.web.util.DateTimeHelper; @@ -71,6 +72,7 @@ import static org.apache.atlas.AtlasClientV2.*; @Order(4) public class NotificationHookConsumer implements Service, ActiveStateChangeHandler { private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class); + private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class); private static final String LOCALHOST = "localhost"; private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED"); @@ -236,113 +238,124 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl @VisibleForTesting void handleMessage(AtlasKafkaMessage<HookNotificationMessage> kafkaMsg) throws AtlasServiceException, AtlasException { + AtlasPerfTracer perf = null; + HookNotificationMessage message = kafkaMsg.getMessage(); String messageUser = message.getUser(); - // Used for intermediate conversions during create and update - AtlasEntity.AtlasEntitiesWithExtInfo entities; - for (int numRetries = 0; numRetries < maxRetries; numRetries++) { - if (LOG.isDebugEnabled()) { - LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries); - } - try { - RequestContext requestContext = RequestContext.createContext(); - requestContext.setUser(messageUser); - switch (message.getType()) { - case ENTITY_CREATE: - EntityCreateRequest createRequest = (EntityCreateRequest) message; + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, message.getType().name()); + } - if (numRetries == 0) { // audit only on the first attempt - audit(messageUser, CREATE_ENTITY.getMethod(), CREATE_ENTITY.getPath()); - } + try { + // Used for intermediate conversions during create and update + AtlasEntity.AtlasEntitiesWithExtInfo entities; + for (int numRetries = 0; numRetries < maxRetries; numRetries++) { + if (LOG.isDebugEnabled()) { + LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries); + } + try { + RequestContext requestContext = RequestContext.createContext(); + requestContext.setUser(messageUser); - entities = instanceConverter.toAtlasEntities(createRequest.getEntities()); + switch (message.getType()) { + case ENTITY_CREATE: + EntityCreateRequest createRequest = (EntityCreateRequest) message; - atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false); - break; + if (numRetries == 0) { // audit only on the first attempt + audit(messageUser, CREATE_ENTITY.getMethod(), CREATE_ENTITY.getPath()); + } - case ENTITY_PARTIAL_UPDATE: - final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message; + entities = instanceConverter.toAtlasEntities(createRequest.getEntities()); - if (numRetries == 0) { // audit only on the first attempt - audit(messageUser, UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(), - String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), partialUpdateRequest.getTypeName())); - } + atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false); + break; - Referenceable referenceable = partialUpdateRequest.getEntity(); - entities = instanceConverter.toAtlasEntity(referenceable); + case ENTITY_PARTIAL_UPDATE: + final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message; - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName()); - String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>() { - { - put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue()); + if (numRetries == 0) { // audit only on the first attempt + audit(messageUser, UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(), + String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), partialUpdateRequest.getTypeName())); } - }); - // There should only be one root entity - entities.getEntities().get(0).setGuid(guid); + Referenceable referenceable = partialUpdateRequest.getEntity(); + entities = instanceConverter.toAtlasEntity(referenceable); - atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), true); - break; + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName()); + String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>() { + { + put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue()); + } + }); - case ENTITY_DELETE: - final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message; + // There should only be one root entity + entities.getEntities().get(0).setGuid(guid); - if (numRetries == 0) { // audit only on the first attempt - audit(messageUser, DELETE_ENTITY_BY_ATTRIBUTE.getMethod(), - String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), deleteRequest.getTypeName())); - } + atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), true); + break; - try { - AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName()); - atlasEntityStore.deleteByUniqueAttributes(type, - new HashMap<String, Object>() {{ - put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue()); - }}); - } catch (ClassCastException cle) { - LOG.error("Failed to do a partial update on Entity"); - } - break; + case ENTITY_DELETE: + final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message; - case ENTITY_FULL_UPDATE: - EntityUpdateRequest updateRequest = (EntityUpdateRequest) message; + if (numRetries == 0) { // audit only on the first attempt + audit(messageUser, DELETE_ENTITY_BY_ATTRIBUTE.getMethod(), + String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), deleteRequest.getTypeName())); + } - if (numRetries == 0) { // audit only on the first attempt - audit(messageUser, UPDATE_ENTITY.getMethod(), UPDATE_ENTITY.getPath()); - } + try { + AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName()); + atlasEntityStore.deleteByUniqueAttributes(type, + new HashMap<String, Object>() {{ + put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue()); + }}); + } catch (ClassCastException cle) { + LOG.error("Failed to do a partial update on Entity"); + } + break; - entities = instanceConverter.toAtlasEntities(updateRequest.getEntities()); - atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false); - break; + case ENTITY_FULL_UPDATE: + EntityUpdateRequest updateRequest = (EntityUpdateRequest) message; - default: - throw new IllegalStateException("Unknown notification type: " + message.getType().name()); - } + if (numRetries == 0) { // audit only on the first attempt + audit(messageUser, UPDATE_ENTITY.getMethod(), UPDATE_ENTITY.getPath()); + } - break; - } catch (Throwable e) { - LOG.warn("Error handling message", e); - try { - LOG.info("Sleeping for {} ms before retry", consumerRetryInterval); - Thread.sleep(consumerRetryInterval); - } catch (InterruptedException ie) { - LOG.error("Notification consumer thread sleep interrupted"); - } + entities = instanceConverter.toAtlasEntities(updateRequest.getEntities()); + atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false); + break; + + default: + throw new IllegalStateException("Unknown notification type: " + message.getType().name()); + } - if (numRetries == (maxRetries - 1)) { - LOG.warn("Max retries exceeded for message {}", message, e); - failedMessages.add(message); - if (failedMessages.size() >= failedMsgCacheSize) { - recordFailedMessages(); + break; + } catch (Throwable e) { + LOG.warn("Error handling message", e); + try { + LOG.info("Sleeping for {} ms before retry", consumerRetryInterval); + Thread.sleep(consumerRetryInterval); + } catch (InterruptedException ie) { + LOG.error("Notification consumer thread sleep interrupted"); + } + + if (numRetries == (maxRetries - 1)) { + LOG.warn("Max retries exceeded for message {}", message, e); + failedMessages.add(message); + if (failedMessages.size() >= failedMsgCacheSize) { + recordFailedMessages(); + } + return; } - return; + } finally { + RequestContext.clear(); + RequestContextV1.clear(); } - } finally { - RequestContext.clear(); - RequestContextV1.clear(); } + commit(kafkaMsg); + } finally { + AtlasPerfTracer.log(perf); } - commit(kafkaMsg); } private void recordFailedMessages() {